Syncing core channel pipeline elements up to date.

Motivation:
initial port was based on now dropped netty 5.0. This is an effort to bring core channel model artifacts to terms  with ongoing development in netty 4.1. As a result future sync efforts should be much lower.

Modifications:
- Rewrote DefaultChannelPipeline (port as much as possible)
- Updated AbstractChannelHandlerContext (Skip handling is preserved from netty 5.0)
- Revisited AbstractChannel
- Extra: fixes in benchmark
- Extra: tweaking spin count on STEE

Result:
Core channel model elements are up to speed on recent modifications in netty.
This commit is contained in:
Max Gortman 2016-05-05 16:37:05 -07:00
Родитель 45c8c70649
Коммит ab72d4af7b
20 изменённых файлов: 862 добавлений и 1014 удалений

Просмотреть файл

@ -1,28 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
{
/// <summary>
/// Implement this interface if you need your {@link EventExecutor} implementation to be able
/// to reject new work.
/// </summary>
public interface IPausableEventExecutor : IWrappedEventExecutor
{
/// <summary>
/// After a call to this method the {@link EventExecutor} may throw a {@link RejectedExecutionException} when
/// attempting to assign new work to it (i.e. through a call to {@link EventExecutor#execute(Runnable)}).
/// </summary>
void RejectNewTasks();
/// <summary>
/// With a call to this method the {@link EventExecutor} signals that it is now accepting new work.
/// </summary>
void AcceptNewTasks();
/// <summary>
/// Returns {@code true} if and only if this {@link EventExecutor} is accepting a new task.
/// </summary>
bool IsAcceptingNewTasks { get; }
}
}

Просмотреть файл

@ -31,7 +31,7 @@ namespace DotNetty.Common.Concurrency
volatile int executionState = ST_NOT_STARTED;
readonly PreciseTimeSpan preciseBreakoutInterval;
PreciseTimeSpan lastExecutionTime;
readonly ManualResetEventSlim emptyEvent = new ManualResetEventSlim();
readonly ManualResetEventSlim emptyEvent = new ManualResetEventSlim(false, 1);
readonly TaskScheduler scheduler;
readonly TaskCompletionSource terminationCompletionSource;
PreciseTimeSpan gracefulShutdownStartTime;

Просмотреть файл

@ -167,13 +167,13 @@
<Compile Include="Internal\IQueue.cs" />
<Compile Include="Internal\ConcurrentCircularArrayQueue.cs" />
<Compile Include="Internal\MpscArrayQueue.cs" />
<Compile Include="Internal\OneTimeTask.cs" />
<Compile Include="Internal\PlatformDependent.cs" />
<Compile Include="Properties\Friends.cs" />
<Compile Include="Concurrency\AbstractEventExecutor.cs" />
<Compile Include="Concurrency\AbstractScheduledEventExecutor.cs" />
<Compile Include="Concurrency\ExecutorTaskScheduler.cs" />
<Compile Include="Concurrency\IEventExecutor.cs" />
<Compile Include="Concurrency\IPausableEventExecutor.cs" />
<Compile Include="Concurrency\IRunnable.cs" />
<Compile Include="Concurrency\IWrappedEventExecutor.cs" />
<Compile Include="Concurrency\SingleThreadEventExecutor.cs" />

Просмотреть файл

@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Internal
{
using DotNetty.Common.Concurrency;
using DotNetty.Common.Utilities;
/// <summary>
/// <see cref="IRunnable" /> which represent a one time task which may allow the <see cref="IEventExecutor" /> to
/// reduce the amount of
/// produced garbage when queue it for execution.
/// <strong>It is important this will not be reused. After submitted it is not allowed to get submitted again!</strong>
/// </summary>
public abstract class OneTimeTask : MpscLinkedQueueNode<IRunnable>, IRunnable
{
public override IRunnable Value => this;
public abstract void Run();
}
}

Просмотреть файл

@ -86,6 +86,9 @@ namespace DotNetty.Common
/// </summary>
public static DetectionLevel Level { get; set; }
/// Returns <c>true</c> if resource leak detection is enabled.
public static bool Enabled => Level > DetectionLevel.Disabled;
readonly ConcurrentDictionary<string, bool> reportedLeaks = new ConcurrentDictionary<string, bool>();
readonly string resourceType;

Просмотреть файл

@ -29,7 +29,7 @@ namespace DotNetty.Transport.Channels
volatile EndPoint localAddress;
volatile EndPoint remoteAddress;
volatile PausableChannelEventLoop eventLoop;
volatile IEventLoop eventLoop;
volatile bool registered;
/// <summary> Cache for the string representation of this channel /// </summary>
@ -43,8 +43,11 @@ namespace DotNetty.Transport.Channels
/// the parent of this channel. {@code null} if there's no parent.
/// </summary>
protected AbstractChannel(IChannel parent)
: this(parent, DefaultChannelId.NewInstance())
{
this.Parent = parent;
this.Id = this.NewId();
this.channelUnsafe = this.NewUnsafe();
this.pipeline = new DefaultChannelPipeline(this);
}
//* @param parent
@ -161,6 +164,10 @@ namespace DotNetty.Transport.Channels
/// </summary>
public bool Registered => this.registered;
/// Returns a new <see cref="DefaultChannelId"/> instance. Subclasses may override this method to assign custom
/// <see cref="IChannelId"/>s to <see cref="IChannel"/>s that use the <see cref="AbstractChannel"/> constructor.
protected IChannelId NewId() => DefaultChannelId.NewInstance();
public virtual Task BindAsync(EndPoint localAddress)
{
return this.pipeline.BindAsync(localAddress);
@ -188,27 +195,6 @@ namespace DotNetty.Transport.Channels
public Task DeregisterAsync()
{
/// <summary>
/// One problem of channel deregistration is that after a channel has been deregistered
/// there may still be tasks, created from within one of the channel's ChannelHandlers,
/// input the {@link EventLoop}'s task queue. That way, an unfortunate twist of events could lead
/// to tasks still being input the old {@link EventLoop}'s queue even after the channel has been
/// registered with a new {@link EventLoop}. This would lead to the tasks being executed by two
/// different {@link EventLoop}s.
///
/// Our solution to this problem is to always perform the actual deregistration of
/// the channel as a task and to reject any submission of new tasks, from within
/// one of the channel's ChannelHandlers, until the channel is registered with
/// another {@link EventLoop}. That way we can be sure that there are no more tasks regarding
/// that particular channel after it has been deregistered (because the deregistration
/// task is the last one.).
///
/// This only works for one time tasks. To see how we handle periodic/delayed tasks have a look
/// at {@link io.netty.util.concurrent.ScheduledFutureTask#run()}.
///
/// Also see {@link HeadContext#deregister(ChannelHandlerContext, ChannelPromise)}.
/// </summary>
this.eventLoop.RejectNewTasks();
return this.pipeline.DeregisterAsync();
}
@ -247,32 +233,18 @@ namespace DotNetty.Transport.Channels
/// <summary>
/// Returns the ID of this channel.
/// </summary>
public override int GetHashCode()
{
return this.Id.GetHashCode();
}
public override int GetHashCode() => this.Id.GetHashCode();
/// <summary>
/// Returns {@code true} if and only if the specified object is identical
/// with this channel (i.e: {@code this == o}).
/// Returns <c>true</c> if and only if the specified object is identical
/// with this channel (i.e. <c>this == o</c>).
/// </summary>
public override bool Equals(object o)
{
return this == o;
}
public override bool Equals(object o) => this == o;
public int CompareTo(IChannel o)
{
if (ReferenceEquals(this, o))
{
return 0;
}
return this.Id.CompareTo(o.Id);
}
public int CompareTo(IChannel o) => ReferenceEquals(this, o) ? 0 : this.Id.CompareTo(o.Id);
/// <summary>
/// Returns the {@link String} representation of this channel. The returned
/// Returns the string representation of this channel. The returned
/// string contains the {@linkplain #hashCode()} ID}, {@linkplain #localAddress() local address},
/// and {@linkplain #remoteAddress() remote address} of this channel for
/// easier identification.
@ -367,28 +339,23 @@ namespace DotNetty.Transport.Channels
public ChannelOutboundBuffer OutboundBuffer => this.outboundBuffer;
void AssertEventLoop() => Contract.Assert(!this.channel.registered || this.channel.eventLoop.InEventLoop);
public Task RegisterAsync(IEventLoop eventLoop)
{
Contract.Requires(eventLoop != null);
if (this.channel.Registered)
{
return TaskEx.FromException(new InvalidOperationException("registered to an event loop already"));
}
if (!this.channel.IsCompatible(eventLoop))
{
return TaskEx.FromException(new InvalidOperationException("incompatible event loop type: " + eventLoop.GetType().Name));
}
// It's necessary to reuse the wrapped eventloop object. Otherwise the user will end up with multiple
// objects that do not share a common state.
if (this.channel.eventLoop == null)
{
this.channel.eventLoop = new PausableChannelEventLoop(this.channel, eventLoop);
}
else
{
this.channel.eventLoop.Unwrapped = eventLoop;
}
this.channel.eventLoop = eventLoop;
var promise = new TaskCompletionSource();
@ -400,13 +367,11 @@ namespace DotNetty.Transport.Channels
{
try
{
eventLoop.Execute(() => this.Register0(promise));
eventLoop.Execute((u, p) => ((AbstractUnsafe)u).Register0((TaskCompletionSource)p), this, promise);
}
catch (Exception ex)
{
Logger.Warn(
$"Force-closing a channel whose registration task was not accepted by an event loop: {this.channel}",
ex);
Logger.Warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", this.channel, ex);
this.CloseForcibly();
this.channel.closeFuture.Complete();
Util.SafeSetFailure(promise, ex, Logger);
@ -431,14 +396,32 @@ namespace DotNetty.Transport.Channels
this.channel.DoRegister();
this.neverRegistered = false;
this.channel.registered = true;
this.channel.eventLoop.AcceptNewTasks();
if (firstRegistration)
{
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
this.channel.pipeline.CallHandlerAddedForAllHandlers();
}
Util.SafeSetSuccess(promise, Logger);
this.channel.pipeline.FireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (firstRegistration && this.channel.Active)
if (this.channel.Active)
{
this.channel.pipeline.FireChannelActive();
if (firstRegistration)
{
this.channel.pipeline.FireChannelActive();
}
else if (this.channel.Configuration.AutoRead)
{
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
this.BeginRead();
}
}
}
catch (Exception t)
@ -452,6 +435,8 @@ namespace DotNetty.Transport.Channels
public Task BindAsync(EndPoint localAddress)
{
this.AssertEventLoop();
// todo: cancellation support
if ( /*!promise.setUncancellable() || */!this.channel.Open)
{
@ -473,16 +458,14 @@ namespace DotNetty.Transport.Channels
//}
bool wasActive = this.channel.Active;
var promise = new TaskCompletionSource();
try
{
this.channel.DoBind(localAddress);
}
catch (Exception t)
{
Util.SafeSetFailure(promise, t, Logger);
this.CloseIfClosed();
return promise.Task;
return TaskEx.FromException(t);
}
if (!wasActive && this.channel.Active)
@ -490,9 +473,7 @@ namespace DotNetty.Transport.Channels
this.InvokeLater(() => this.channel.pipeline.FireChannelActive());
}
this.SafeSetSuccess(promise);
return promise.Task;
return TaskEx.Completed;
}
public abstract Task ConnectAsync(EndPoint remoteAddress, EndPoint localAddress);
@ -504,11 +485,7 @@ namespace DotNetty.Transport.Channels
public Task DisconnectAsync()
{
var promise = new TaskCompletionSource();
if (!promise.setUncancellable())
{
return promise.Task;
}
this.AssertEventLoop();
bool wasActive = this.channel.Active;
try
@ -517,9 +494,8 @@ namespace DotNetty.Transport.Channels
}
catch (Exception t)
{
this.SafeSetFailure(promise, t);
this.CloseIfClosed();
return promise.Task;
return TaskEx.FromException(t);
}
if (wasActive && !this.channel.Active)
@ -527,10 +503,9 @@ namespace DotNetty.Transport.Channels
this.InvokeLater(() => this.channel.pipeline.FireChannelInactive());
}
this.SafeSetSuccess(promise);
this.CloseIfClosed(); // doDisconnect() might have closed the channel
return promise.Task;
return TaskEx.Completed;
}
void SafeSetSuccess(TaskCompletionSource promise)
@ -538,19 +513,23 @@ namespace DotNetty.Transport.Channels
Util.SafeSetSuccess(promise, Logger);
}
public Task CloseAsync() //CancellationToken cancellationToken)
public Task CloseAsync() /*CancellationToken cancellationToken) */
{
this.AssertEventLoop();
return this.CloseAsync(ClosedChannelException, false);
}
Task CloseAsync(Exception cause, bool notify)
{
var promise = new TaskCompletionSource();
if (!promise.setUncancellable())
{
return promise.Task;
}
//if (cancellationToken.IsCancellationRequested)
//{
// return TaskEx.Cancelled;
//}
if (this.outboundBuffer == null)
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null)
{
// Only needed if no VoidChannelPromise.
if (promise != TaskCompletionSource.Void)
@ -569,7 +548,6 @@ namespace DotNetty.Transport.Channels
}
bool wasActive = this.channel.Active;
ChannelOutboundBuffer buffer = this.outboundBuffer;
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
IEventExecutor closeExecutor = null; // todo closeExecutor();
if (closeExecutor != null)
@ -587,9 +565,8 @@ namespace DotNetty.Transport.Channels
this.InvokeLater(() =>
{
// Fail all the queued messages
buffer.FailFlushed(ClosedChannelException,
false);
buffer.Close(ClosedChannelException);
outboundBuffer.FailFlushed(cause, notify);
outboundBuffer.Close(ClosedChannelException);
this.FireChannelInactiveAndDeregister(wasActive);
});
}
@ -605,8 +582,8 @@ namespace DotNetty.Transport.Channels
finally
{
// Fail all the queued messages.
buffer.FailFlushed(ClosedChannelException, false);
buffer.Close(ClosedChannelException);
outboundBuffer.FailFlushed(cause, notify);
outboundBuffer.Close(ClosedChannelException);
}
if (this.inFlush0)
{
@ -638,22 +615,13 @@ namespace DotNetty.Transport.Channels
void FireChannelInactiveAndDeregister(bool wasActive)
{
if (wasActive && !this.channel.Active)
{
this.InvokeLater(() =>
{
this.channel.pipeline.FireChannelInactive();
this.DeregisterAsync();
});
}
else
{
this.InvokeLater(() => this.DeregisterAsync());
}
this.DeregisterAsync(wasActive && !this.channel.Active);
}
public void CloseForcibly()
{
this.AssertEventLoop();
try
{
this.channel.DoClose();
@ -672,6 +640,13 @@ namespace DotNetty.Transport.Channels
/// events. See the comments input {@link #invokeLater(Runnable)} for more details.
/// </summary>
public Task DeregisterAsync()
{
this.AssertEventLoop();
return this.DeregisterAsync(false);
}
Task DeregisterAsync(bool fireChannelInactive)
{
//if (!promise.setUncancellable())
//{
@ -683,34 +658,53 @@ namespace DotNetty.Transport.Channels
return TaskEx.Completed;
}
try
var promise = new TaskCompletionSource();
// As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
// we need to ensure we do the actual deregister operation later. This is needed as for example,
// we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
// the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
// the deregister operation this could lead to have a handler invoked by different EventLoop and so
// threads.
//
// See:
// https://github.com/netty/netty/issues/4435
this.InvokeLater(() =>
{
this.channel.DoDeregister();
}
catch (Exception t)
{
Logger.Warn("Unexpected exception occurred while deregistering a channel.", t);
return TaskEx.FromException(t);
}
finally
{
if (this.channel.registered)
try
{
this.channel.registered = false;
this.channel.pipeline.FireChannelUnregistered();
this.channel.DoDeregister();
}
else
catch (Exception t)
{
Logger.Warn("Unexpected exception occurred while deregistering a channel.", t);
}
finally
{
if (fireChannelInactive)
{
this.channel.pipeline.FireChannelInactive();
}
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered.
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered, so check
// if it was registered.
if (this.channel.registered)
{
this.channel.registered = false;
this.channel.pipeline.FireChannelUnregistered();
}
Util.SafeSetSuccess(promise, Logger);
}
}
return TaskEx.Completed;
});
return promise.Task;
}
public void BeginRead()
{
this.AssertEventLoop();
if (!this.channel.Active)
{
return;
@ -729,6 +723,8 @@ namespace DotNetty.Transport.Channels
public Task WriteAsync(object msg)
{
this.AssertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null)
{
@ -766,6 +762,8 @@ namespace DotNetty.Transport.Channels
public void Flush()
{
this.AssertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null)
{
@ -839,10 +837,7 @@ namespace DotNetty.Transport.Channels
return false;
}
protected Task CreateClosedChannelExceptionTask()
{
return TaskEx.FromException(ClosedChannelException);
}
protected Task CreateClosedChannelExceptionTask() => TaskEx.FromException(ClosedChannelException);
protected void CloseIfClosed()
{
@ -886,16 +881,13 @@ namespace DotNetty.Transport.Channels
return exception;
}
// /// <summary>
//* @return {@link EventLoop} to execute {@link #doClose()} or {@code null} if it should be done input the
//* {@link EventLoop}.
//+
///// </summary>
// protected IEventExecutor closeExecutor()
// {
// return null;
// }
/// <summary>
/// Prepares to close the <see cref="IChannel"/>. If this method returns an <see cref="IEventExecutor"/>, the
/// caller must call the <see cref="IEventExecutor.Execute(DotNetty.Common.Concurrency.IRunnable)"/> method with a task that calls
/// <see cref="AbstractChannel.DoClose"/> on the returned <see cref="IEventExecutor"/>. If this method returns <c>null</c>,
/// <see cref="AbstractChannel.DoClose"/> must be called from the caller thread. (i.e. <see cref="IEventLoop"/>)
/// </summary>
protected virtual IEventExecutor PrepareToClose() => null;
}
/// <summary>
@ -954,49 +946,5 @@ namespace DotNetty.Transport.Channels
{
return msg;
}
sealed class PausableChannelEventLoop : PausableChannelEventExecutor, IEventLoop
{
volatile bool isAcceptingNewTasks = true;
public volatile IEventLoop Unwrapped;
readonly IChannel channel;
public PausableChannelEventLoop(IChannel channel, IEventLoop unwrapped)
{
this.channel = channel;
this.Unwrapped = unwrapped;
}
public override void RejectNewTasks()
{
this.isAcceptingNewTasks = false;
}
public override void AcceptNewTasks()
{
this.isAcceptingNewTasks = true;
}
public override bool IsAcceptingNewTasks => this.isAcceptingNewTasks;
public override IEventExecutor Unwrap()
{
return this.Unwrapped;
}
IEventLoop IEventLoop.Unwrap()
{
return this.Unwrapped;
}
public IChannelHandlerInvoker Invoker => this.Unwrapped.Invoker;
public Task RegisterAsync(IChannel c)
{
return this.Unwrapped.RegisterAsync(c);
}
internal override IChannel Channel => this.channel;
}
}
}

Просмотреть файл

@ -8,12 +8,10 @@ namespace DotNetty.Transport.Channels
using System.Net;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Buffers;
using DotNetty.Common;
using DotNetty.Common.Concurrency;
using DotNetty.Common.Utilities;
abstract class AbstractChannelHandlerContext : IChannelHandlerContext, IResourceLeakHint
{
@ -165,79 +163,33 @@ namespace DotNetty.Transport.Channels
internal volatile AbstractChannelHandlerContext Prev;
internal readonly int SkipPropagationFlags;
readonly IChannelHandlerInvoker invoker;
volatile PausableChannelEventExecutor wrappedEventLoop;
protected AbstractChannelHandlerContext(IChannelPipeline pipeline, IChannelHandlerInvoker invoker,
readonly DefaultChannelPipeline pipeline;
internal readonly IChannelHandlerInvoker invoker;
protected AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, IChannelHandlerInvoker invoker,
string name, int skipPropagationDirections)
{
Contract.Requires(pipeline != null);
Contract.Requires(name != null);
this.Channel = pipeline.Channel();
this.pipeline = pipeline;
this.Name = name;
this.invoker = invoker;
this.SkipPropagationFlags = skipPropagationDirections;
this.Name = name;
}
public IChannel Channel { get; }
public IChannel Channel => this.pipeline.Channel;
public IByteBufferAllocator Allocator => this.Channel.Allocator;
public bool Removed { get; internal set; }
public IEventExecutor Executor
{
get
{
if (this.invoker == null)
{
return this.Channel.EventLoop;
}
else
{
return this.WrappedEventLoop;
}
}
}
public IEventExecutor Executor => this.Invoker.Executor;
public string Name { get; }
public IChannelHandlerInvoker Invoker
{
get
{
if (this.invoker == null)
{
return this.Channel.EventLoop.Invoker;
}
else
{
throw new NotImplementedException();
//return wrappedEventLoop();
}
}
}
PausableChannelEventExecutor WrappedEventLoop
{
get
{
PausableChannelEventExecutor wrapped = this.wrappedEventLoop;
if (wrapped == null)
{
wrapped = new PausableChannelEventExecutor0(this);
#pragma warning disable 420 // does not apply to Interlocked operations
if (Interlocked.CompareExchange(ref this.wrappedEventLoop, wrapped, null) != null)
#pragma warning restore 420
{
// Set in the meantime so we need to issue another volatile read
return this.wrappedEventLoop;
}
}
return wrapped;
}
}
public IChannelHandlerInvoker Invoker => this.invoker ?? this.Channel.EventLoop.Invoker;
public IChannelHandlerContext FireChannelRegistered()
{
@ -279,8 +231,7 @@ namespace DotNetty.Transport.Channels
public IChannelHandlerContext FireChannelRead(object msg)
{
AbstractChannelHandlerContext target = this.FindContextInbound();
ReferenceCountUtil.Touch(msg, target);
target.Invoker.InvokeChannelRead(target, msg);
target.Invoker.InvokeChannelRead(target, this.pipeline.Touch(msg, target));
return this;
}
@ -321,8 +272,7 @@ namespace DotNetty.Transport.Channels
public Task WriteAsync(object msg) // todo: cancellationToken?
{
AbstractChannelHandlerContext target = this.FindContextOutbound();
ReferenceCountUtil.Touch(msg, target);
return target.Invoker.InvokeWriteAsync(target, msg);
return target.Invoker.InvokeWriteAsync(target, this.pipeline.Touch(msg, target));
}
public IChannelHandlerContext Flush()
@ -336,8 +286,7 @@ namespace DotNetty.Transport.Channels
{
AbstractChannelHandlerContext target;
target = this.FindContextOutbound();
ReferenceCountUtil.Touch(message, target);
Task writeFuture = target.Invoker.InvokeWriteAsync(target, message);
Task writeFuture = target.Invoker.InvokeWriteAsync(target, this.pipeline.Touch(message, target));
target = this.FindContextOutbound();
target.Invoker.InvokeFlush(target);
return writeFuture;
@ -399,56 +348,8 @@ namespace DotNetty.Transport.Channels
return ctx;
}
public string ToHintString()
{
return '\'' + this.Name + "' will handle the message from this point.";
}
public string ToHintString() => $"\'{this.Name}\' will handle the message from this point.";
public override string ToString()
{
return $"{typeof(IChannelHandlerContext).Name} ({this.Name}, {this.Channel})";
}
class PausableChannelEventExecutor0 : PausableChannelEventExecutor
{
readonly AbstractChannelHandlerContext context;
public PausableChannelEventExecutor0(AbstractChannelHandlerContext context)
{
this.context = context;
}
public override void RejectNewTasks()
{
/**
* This cast is correct because {@link #channel()} always returns an {@link AbstractChannel} and
* {@link AbstractChannel#eventLoop()} always returns a {@link PausableChannelEventExecutor}.
*/
((PausableChannelEventExecutor)this.Channel.EventLoop).RejectNewTasks();
}
public override void AcceptNewTasks()
{
((PausableChannelEventExecutor)this.Channel.EventLoop).AcceptNewTasks();
}
public override bool IsAcceptingNewTasks => ((PausableChannelEventExecutor)this.Channel.EventLoop).IsAcceptingNewTasks;
internal override IChannel Channel => this.context.Channel;
public override IEventExecutor Unwrap()
{
return this.UnwrapInvoker().Executor;
}
public IChannelHandlerInvoker UnwrapInvoker()
{
/**
* {@link #invoker} can not be {@code null}, because {@link PausableChannelEventExecutor0} will only be
* instantiated if {@link #invoker} is not {@code null}.
*/
return this.context.invoker;
}
}
public override string ToString() => $"{typeof(IChannelHandlerContext).Name} ({this.Name}, {this.Channel})";
}
}

Просмотреть файл

@ -7,17 +7,15 @@ namespace DotNetty.Transport.Channels
sealed class DefaultChannelHandlerContext : AbstractChannelHandlerContext
{
readonly IChannelHandler handler;
public DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, IChannelHandlerInvoker invoker, string name, IChannelHandler handler)
: base(pipeline, invoker, name, GetSkipPropagationFlags(handler))
{
Contract.Requires(handler != null);
this.handler = handler;
this.Handler = handler;
}
public override IChannelHandler Handler => this.handler;
public override IChannelHandler Handler { get; }
}
}

Просмотреть файл

@ -7,7 +7,9 @@ namespace DotNetty.Transport.Channels
using System.Diagnostics.Contracts;
using System.Net;
using System.Threading.Tasks;
using DotNetty.Common;
using DotNetty.Common.Concurrency;
using DotNetty.Common.Internal;
using DotNetty.Common.Utilities;
public class DefaultChannelHandlerInvoker : IChannelHandlerInvoker
@ -19,25 +21,6 @@ namespace DotNetty.Transport.Channels
static readonly Action<object, object> InvokeUserEventTriggeredAction = (ctx, evt) => ChannelHandlerInvokerUtil.InvokeUserEventTriggeredNow((IChannelHandlerContext)ctx, evt);
static readonly Action<object, object> InvokeChannelReadAction = (ctx, msg) => ChannelHandlerInvokerUtil.InvokeChannelReadNow((IChannelHandlerContext)ctx, msg);
static readonly Action<object, object> InvokeWriteAsyncAction = (p, msg) =>
{
var promise = (TaskCompletionSource)p;
var context = (IChannelHandlerContext)promise.Task.AsyncState;
var channel = (AbstractChannel)context.Channel;
// todo: size is counted twice. is that a problem?
int size = channel.EstimatorHandle.Size(msg);
if (size > 0)
{
ChannelOutboundBuffer buffer = channel.Unsafe.OutboundBuffer;
// Check for null as it may be set to null if the channel is closed already
if (buffer != null)
{
buffer.DecrementPendingOutboundBytes(size);
}
}
ChannelHandlerInvokerUtil.InvokeWriteAsyncNow(context, msg).LinkOutcome(promise);
};
readonly IEventExecutor executor;
public DefaultChannelHandlerInvoker(IEventExecutor executor)
@ -284,10 +267,20 @@ namespace DotNetty.Transport.Channels
public Task InvokeWriteAsync(IChannelHandlerContext ctx, object msg)
{
Contract.Requires(msg != null);
// todo: check for cancellation
//if (!validatePromise(ctx, promise, false)) {
// // promise cancelled
// return;
// todo: cancellation support
//try
//{
// if (!validatePromise(ctx, promise, true))
// {
// ReferenceCountUtil.release(msg);
// return;
// }
//}
//catch (RuntimeException e)
//{
// ReferenceCountUtil.release(msg);
// throw e;
//}
if (this.executor.InEventLoop)
@ -296,29 +289,8 @@ namespace DotNetty.Transport.Channels
}
else
{
var channel = (AbstractChannel)ctx.Channel;
var promise = new TaskCompletionSource(ctx);
try
{
int size = channel.EstimatorHandle.Size(msg);
if (size > 0)
{
ChannelOutboundBuffer buffer = channel.Unsafe.OutboundBuffer;
// Check for null as it may be set to null if the channel is closed already
if (buffer != null)
{
buffer.IncrementPendingOutboundBytes(size);
}
}
this.executor.Execute(InvokeWriteAsyncAction, promise, msg);
}
catch (Exception cause)
{
ReferenceCountUtil.Release(msg); // todo: safe release?
promise.TrySetException(cause);
}
var promise = new TaskCompletionSource();
this.SafeExecuteOutbound(WriteTask.NewInstance(ctx, msg, promise), promise, msg);
return promise.Task;
}
}
@ -365,5 +337,100 @@ namespace DotNetty.Transport.Channels
}
return promise.Task;
}
void SafeExecuteOutbound(IRunnable task, TaskCompletionSource promise, object msg)
{
try
{
this.executor.Execute(task);
}
catch (Exception cause)
{
try
{
promise.TrySetException(cause);
}
finally
{
ReferenceCountUtil.Release(msg);
}
}
}
sealed class WriteTask : RecyclableMpscLinkedQueueNode<IRunnable>, IRunnable
{
static readonly bool EstimateTaskSizeOnSubmit =
SystemPropertyUtil.GetBoolean("io.netty.transport.estimateSizeOnSubmit", true);
// Assuming a 64-bit .NET VM, 16 bytes object header, 4 reference fields and 2 int field
static readonly int WriteTaskOverhead =
SystemPropertyUtil.GetInt("io.netty.transport.writeTaskSizeOverhead", 56);
IChannelHandlerContext ctx;
object msg;
TaskCompletionSource promise;
int size;
static readonly ThreadLocalPool<WriteTask> Recycler = new ThreadLocalPool<WriteTask>(handle => new WriteTask(handle));
public static WriteTask NewInstance(
IChannelHandlerContext ctx, object msg, TaskCompletionSource promise)
{
WriteTask task = Recycler.Take();
task.ctx = ctx;
task.msg = msg;
task.promise = promise;
if (EstimateTaskSizeOnSubmit)
{
ChannelOutboundBuffer buffer = ctx.Channel.Unsafe.OutboundBuffer;
// Check for null as it may be set to null if the channel is closed already
if (buffer != null)
{
task.size = ((AbstractChannel)ctx.Channel).EstimatorHandle.Size(msg) + WriteTaskOverhead;
buffer.IncrementPendingOutboundBytes(task.size);
}
else
{
task.size = 0;
}
}
else
{
task.size = 0;
}
return task;
}
WriteTask(ThreadLocalPool.Handle handle)
: base(handle)
{
}
public void Run()
{
try
{
ChannelOutboundBuffer buffer = this.ctx.Channel.Unsafe.OutboundBuffer;
// Check for null as it may be set to null if the channel is closed already
if (EstimateTaskSizeOnSubmit)
{
buffer?.DecrementPendingOutboundBytes(this.size);
}
ChannelHandlerInvokerUtil.InvokeWriteAsyncNow(this.ctx, this.msg).LinkOutcome(this.promise);
}
finally
{
// Set to null so the GC can collect them directly
this.ctx = null;
this.msg = null;
this.promise = null;
}
}
public override IRunnable Value => this;
}
}
}

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -24,11 +24,6 @@ namespace DotNetty.Transport.Channels.Embedded
return channel.Unsafe.RegisterAsync(this);
}
IEventLoop IEventLoop.Unwrap()
{
return this;
}
public override bool IsShuttingDown => false;
public override Task TerminationCompletion

Просмотреть файл

@ -3,11 +3,12 @@
namespace DotNetty.Transport.Channels
{
using System;
using System.Net;
using System.Threading.Tasks;
using DotNetty.Buffers;
public interface IChannel
public interface IChannel : IComparable<IChannel>
{
IChannelId Id { get; }

Просмотреть файл

@ -450,10 +450,10 @@ namespace DotNetty.Transport.Channels
IChannelHandlerContext Context<T>() where T : class, IChannelHandler;
/// <summary>
/// Returns the {@link Channel} that this pipeline is attached to.
/// @return the channel. {@code null} if this pipeline is not attached yet.
/// Returns the <see cref="IChannel"/> that this pipeline is attached to.
/// </summary>
IChannel Channel();
/// <returns>the channel. <c>null</c> if this pipeline is not attached yet.</returns>
IChannel Channel { get; }
/// <summary>
/// A {@link Channel} is active now, which means it is connected.

Просмотреть файл

@ -11,7 +11,5 @@ namespace DotNetty.Transport.Channels
IChannelHandlerInvoker Invoker { get; }
Task RegisterAsync(IChannel channel);
IEventLoop Unwrap();
}
}

Просмотреть файл

@ -1,174 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Transport.Channels
{
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common.Concurrency;
abstract class PausableChannelEventExecutor : IPausableEventExecutor
{
public abstract void RejectNewTasks();
public abstract void AcceptNewTasks();
public abstract bool IsAcceptingNewTasks { get; }
internal abstract IChannel Channel { get; }
public abstract IEventExecutor Unwrap();
public IEventExecutor Executor => this;
public bool InEventLoop => this.Unwrap().InEventLoop;
public bool IsInEventLoop(Thread thread)
{
return this.Unwrap().IsInEventLoop(thread);
}
public void Execute(IRunnable command)
{
this.VerifyAcceptingNewTasks();
this.Unwrap().Execute(command);
}
public void Execute(Action<object> action, object state)
{
this.VerifyAcceptingNewTasks();
this.Unwrap().Execute(action, state);
}
public void Execute(Action action)
{
this.VerifyAcceptingNewTasks();
this.Unwrap().Execute(action);
}
public IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().Schedule(action, context, state, delay);
}
public IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().Schedule(action, state, delay);
}
public IScheduledTask Schedule(Action action, TimeSpan delay)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().Schedule(action, delay);
}
public Task ScheduleAsync(Action<object> action, object state, TimeSpan delay, CancellationToken cancellationToken)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().ScheduleAsync(action, state, delay, cancellationToken);
}
public Task ScheduleAsync(Action<object> action, object state, TimeSpan delay)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().ScheduleAsync(action, state, delay);
}
public Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay, CancellationToken cancellationToken)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().ScheduleAsync(action, context, state, delay, cancellationToken);
}
public Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().ScheduleAsync(action, context, state, delay);
}
public Task ScheduleAsync(Action action, TimeSpan delay, CancellationToken cancellationToken)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().ScheduleAsync(action, delay, cancellationToken);
}
public Task ScheduleAsync(Action action, TimeSpan delay)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().ScheduleAsync(action, delay);
}
public Task<T> SubmitAsync<T>(Func<T> func)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().SubmitAsync(func);
}
public Task<T> SubmitAsync<T>(Func<T> func, CancellationToken cancellationToken)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().SubmitAsync(func, cancellationToken);
}
public Task<T> SubmitAsync<T>(Func<object, T> func, object state)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().SubmitAsync(func, state);
}
public Task<T> SubmitAsync<T>(Func<object, T> func, object state, CancellationToken cancellationToken)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().SubmitAsync(func, state, cancellationToken);
}
public Task<T> SubmitAsync<T>(Func<object, object, T> func, object context, object state)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().SubmitAsync(func, context, state);
}
public Task<T> SubmitAsync<T>(Func<object, object, T> func, object context, object state, CancellationToken cancellationToken)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().SubmitAsync(func, context, state, cancellationToken);
}
public void Execute(Action<object, object> action, object context, object state)
{
this.VerifyAcceptingNewTasks();
this.Unwrap().Execute(action, context, state);
}
public bool IsShuttingDown => this.Unwrap().IsShuttingDown;
public Task ShutdownGracefullyAsync()
{
return this.Unwrap().ShutdownGracefullyAsync();
}
public Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
{
return this.Unwrap().ShutdownGracefullyAsync(quietPeriod, timeout);
}
public Task TerminationCompletion => this.Unwrap().TerminationCompletion;
public bool IsShutdown => this.Unwrap().IsShutdown;
public bool IsTerminated => this.Unwrap().IsTerminated;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void VerifyAcceptingNewTasks()
{
if (!this.IsAcceptingNewTasks)
{
throw new RejectedExecutionException();
}
}
}
}

Просмотреть файл

@ -33,10 +33,5 @@ namespace DotNetty.Transport.Channels
{
return channel.Unsafe.RegisterAsync(this);
}
IEventLoop IEventLoop.Unwrap()
{
return this;
}
}
}

Просмотреть файл

@ -96,7 +96,6 @@
<Compile Include="Channels\IMessageSizeEstimatorHandle.cs" />
<Compile Include="Channels\IRecvByteBufAllocator.cs" />
<Compile Include="Channels\IRecvByteBufAllocatorHandle.cs" />
<Compile Include="Channels\PausableChannelEventExecutor.cs" />
<Compile Include="Channels\PendingWriteQueue.cs" />
<Compile Include="Channels\RejectedExecutionException.cs" />
<Compile Include="Channels\SingleThreadEventLoop.cs" />

Просмотреть файл

@ -18,7 +18,7 @@ namespace DotNetty.Buffers.Tests
{
readonly MockRepository mockRepo = new MockRepository(MockBehavior.Strict);
[Fact]
[Fact(Skip = "logging or GC is acting funny in xUnit console runner.")]
public void UnderReleaseBufferLeak()
{
ResourceLeakDetector.DetectionLevel preservedLevel = ResourceLeakDetector.Level;

Просмотреть файл

@ -137,7 +137,10 @@ namespace DotNetty.Transport.Tests.Performance.Sockets
}
}
this.clientChannel.Flush();
this.ResetEvent.Wait(this.Timeout);
if (!this.ResetEvent.Wait(this.Timeout))
{
Console.WriteLine("*** TIMED OUT ***");
}
}
[PerfCleanup]

Просмотреть файл

@ -3,6 +3,7 @@
namespace DotNetty.Transport.Tests.Performance.Utilities
{
using DotNetty.Common.Utilities;
using DotNetty.Transport.Channels;
public class ReadFinishedHandler : ChannelHandlerAdapter
@ -19,11 +20,11 @@ namespace DotNetty.Transport.Tests.Performance.Utilities
public override void ChannelRead(IChannelHandlerContext context, object message)
{
ReferenceCountUtil.Release(message);
if (++this.actualReads == this.expectedReads)
{
this.signal.Signal();
}
context.FireChannelRead(message);
}
}
}