From d5a4c3077813c17eb8f373e02c345b56cc1ec4a2 Mon Sep 17 00:00:00 2001 From: mgortman Date: Mon, 4 Apr 2016 02:42:54 -0700 Subject: [PATCH] Adds IEventExecutor.Schedule, proper cancellation of scheduled tasks Motivation: Allow for less GC when scheduling if Task-based API is not required. Allow for proper cleanup when canceling scheduled task so that associated resources can be cleaned up early. Modifications: - Moved out scheduled task implementations from AbstractScheduledEventExecutor - Introduced IScheduledTask and aligned IScheduledRunnable - Implemented removal of scheduled task upon its cancellation - Extra minor fixes - IByteBuffer.ToString(Encoding) honoring ArrayOffset - InternalLoggerFactory creating default logging factory lazily Result: Scheduled tasks that normally never fire (e.g. timeout handling) do not cause resources to be held up unnecessarily in memory for longer period of time if not necessary. --- src/DotNetty.Buffers/ByteBufferUtil.cs | 2 +- .../Concurrency/AbstractEventExecutor.cs | 15 ++ .../AbstractScheduledEventExecutor.cs | 191 +++++------------- .../Concurrency/ActionScheduledAsyncTask.cs | 24 +++ .../Concurrency/ActionScheduledTask.cs | 23 +++ .../Concurrency/IEventExecutor.cs | 28 +++ .../Concurrency/IScheduledRunnable.cs | 11 + .../Concurrency/IScheduledTask.cs | 19 ++ .../Concurrency/ScheduledAsyncTask.cs | 33 +++ .../Concurrency/ScheduledTask.cs | 110 ++++++++++ .../StateActionScheduledAsyncTask.cs | 25 +++ .../Concurrency/StateActionScheduledTask.cs | 23 +++ ...tateActionWithContextScheduledAsyncTask.cs | 27 +++ .../StateActionWithContextScheduledTask.cs | 26 +++ src/DotNetty.Common/DotNetty.Common.csproj | 10 + .../Internal/Logging/InternalLoggerFactory.cs | 22 +- .../Utilities/PriorityQueue.cs | 27 ++- .../Channels/PausableChannelEventExecutor.cs | 18 ++ .../Channels/Sockets/AbstractSocketChannel.cs | 51 +++-- .../DotNetty.Common.Tests.csproj | 1 + .../ThreadLocalPoolTest.cs | 2 +- .../Utilities/PriorityQueueTest.cs | 65 ++++++ .../Channel/Embedded/EmbeddedChannelTest.cs | 33 +++ 23 files changed, 615 insertions(+), 171 deletions(-) create mode 100644 src/DotNetty.Common/Concurrency/ActionScheduledAsyncTask.cs create mode 100644 src/DotNetty.Common/Concurrency/ActionScheduledTask.cs create mode 100644 src/DotNetty.Common/Concurrency/IScheduledRunnable.cs create mode 100644 src/DotNetty.Common/Concurrency/IScheduledTask.cs create mode 100644 src/DotNetty.Common/Concurrency/ScheduledAsyncTask.cs create mode 100644 src/DotNetty.Common/Concurrency/ScheduledTask.cs create mode 100644 src/DotNetty.Common/Concurrency/StateActionScheduledAsyncTask.cs create mode 100644 src/DotNetty.Common/Concurrency/StateActionScheduledTask.cs create mode 100644 src/DotNetty.Common/Concurrency/StateActionWithContextScheduledAsyncTask.cs create mode 100644 src/DotNetty.Common/Concurrency/StateActionWithContextScheduledTask.cs create mode 100644 test/DotNetty.Common.Tests/Utilities/PriorityQueueTest.cs diff --git a/src/DotNetty.Buffers/ByteBufferUtil.cs b/src/DotNetty.Buffers/ByteBufferUtil.cs index 66503f1..2ee9195 100644 --- a/src/DotNetty.Buffers/ByteBufferUtil.cs +++ b/src/DotNetty.Buffers/ByteBufferUtil.cs @@ -579,7 +579,7 @@ namespace DotNetty.Buffers if (src.HasArray) { - return encoding.GetString(src.Array, readerIndex, len); + return encoding.GetString(src.Array, src.ArrayOffset + readerIndex, len); } else { diff --git a/src/DotNetty.Common/Concurrency/AbstractEventExecutor.cs b/src/DotNetty.Common/Concurrency/AbstractEventExecutor.cs index 6e29e53..bf60532 100644 --- a/src/DotNetty.Common/Concurrency/AbstractEventExecutor.cs +++ b/src/DotNetty.Common/Concurrency/AbstractEventExecutor.cs @@ -55,6 +55,21 @@ namespace DotNetty.Common.Concurrency this.Execute(new ActionTaskQueueNode(action)); } + public virtual IScheduledTask Schedule(Action action, TimeSpan delay) + { + throw new NotSupportedException(); + } + + public virtual IScheduledTask Schedule(Action action, object state, TimeSpan delay) + { + throw new NotSupportedException(); + } + + public virtual IScheduledTask Schedule(Action action, object context, object state, TimeSpan delay) + { + throw new NotSupportedException(); + } + public virtual Task ScheduleAsync(Action action, TimeSpan delay) { return this.ScheduleAsync(action, delay, CancellationToken.None); diff --git a/src/DotNetty.Common/Concurrency/AbstractScheduledEventExecutor.cs b/src/DotNetty.Common/Concurrency/AbstractScheduledEventExecutor.cs index 59700bf..8969638 100644 --- a/src/DotNetty.Common/Concurrency/AbstractScheduledEventExecutor.cs +++ b/src/DotNetty.Common/Concurrency/AbstractScheduledEventExecutor.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. namespace DotNetty.Common.Concurrency @@ -15,8 +15,6 @@ namespace DotNetty.Common.Concurrency /// public abstract class AbstractScheduledEventExecutor : AbstractEventExecutor { - static readonly Action AddScheduledTaskAction = (e, t) => ((AbstractScheduledEventExecutor)e).ScheduledTaskQueue.Enqueue((IScheduledRunnable)t); - protected readonly PriorityQueue ScheduledTaskQueue = new PriorityQueue(); // TODO: support for EventExecutorGroup @@ -94,172 +92,89 @@ namespace DotNetty.Common.Concurrency return scheduledTask != null && scheduledTask.Deadline <= PreciseTimeSpan.FromStart; } + public override IScheduledTask Schedule(Action action, TimeSpan delay) + { + return this.Schedule(new ActionScheduledTask(this, action, PreciseTimeSpan.Deadline(delay))); + } + + public override IScheduledTask Schedule(Action action, object state, TimeSpan delay) + { + return this.Schedule(new StateActionScheduledTask(this, action, state, PreciseTimeSpan.Deadline(delay))); + } + + public override IScheduledTask Schedule(Action action, object context, object state, TimeSpan delay) + { + return this.Schedule(new StateActionWithContextScheduledTask(this, action, context, state, PreciseTimeSpan.Deadline(delay))); + } + public override Task ScheduleAsync(Action action, TimeSpan delay, CancellationToken cancellationToken) { - var scheduledTask = new ActionScheduledTask(action, PreciseTimeSpan.Deadline(delay), cancellationToken); - if (this.InEventLoop) + if (cancellationToken.IsCancellationRequested) { - this.ScheduledTaskQueue.Enqueue(scheduledTask); + return TaskEx.Cancelled; } - else + + if (!cancellationToken.CanBeCanceled) { - this.Execute(AddScheduledTaskAction, this, scheduledTask); + return this.Schedule(action, delay).Completion; } - return scheduledTask.Completion; + + return this.Schedule(new ActionScheduledAsyncTask(this, action, PreciseTimeSpan.Deadline(delay), cancellationToken)).Completion; } public override Task ScheduleAsync(Action action, object state, TimeSpan delay, CancellationToken cancellationToken) { - var scheduledTask = new StateActionScheduledTask(action, state, PreciseTimeSpan.Deadline(delay), cancellationToken); - if (this.InEventLoop) + if (cancellationToken.IsCancellationRequested) { - this.ScheduledTaskQueue.Enqueue(scheduledTask); + return TaskEx.Cancelled; } - else + + if (!cancellationToken.CanBeCanceled) { - this.Execute(AddScheduledTaskAction, this, scheduledTask); + return this.Schedule(action, state, delay).Completion; } - return scheduledTask.Completion; + + return this.Schedule(new StateActionScheduledAsyncTask(this, action, state, PreciseTimeSpan.Deadline(delay), cancellationToken)).Completion; } public override Task ScheduleAsync(Action action, object context, object state, TimeSpan delay, CancellationToken cancellationToken) { - var scheduledTask = new StateActionWithContextScheduledTask(action, context, state, PreciseTimeSpan.Deadline(delay), cancellationToken); + if (cancellationToken.IsCancellationRequested) + { + return TaskEx.Cancelled; + } + + if (!cancellationToken.CanBeCanceled) + { + return this.Schedule(action, context, state, delay).Completion; + } + + return this.Schedule(new StateActionWithContextScheduledAsyncTask(this, action, context, state, PreciseTimeSpan.Deadline(delay), cancellationToken)).Completion; + } + + protected IScheduledRunnable Schedule(IScheduledRunnable task) + { if (this.InEventLoop) { - this.ScheduledTaskQueue.Enqueue(scheduledTask); + this.ScheduledTaskQueue.Enqueue(task); } else { - this.Execute(AddScheduledTaskAction, this, scheduledTask); + this.Execute((e, t) => ((AbstractScheduledEventExecutor)e).ScheduledTaskQueue.Enqueue((IScheduledRunnable)t), this, task); } - return scheduledTask.Completion; + return task; } - #region Scheduled task data structures - - protected interface IScheduledRunnable : IRunnable, IComparable + internal void RemoveScheduled(IScheduledRunnable task) { - PreciseTimeSpan Deadline { get; } - - bool Cancel(); - } - - protected abstract class ScheduledTaskBase : MpscLinkedQueueNode, IScheduledRunnable - { - readonly TaskCompletionSource promise; - - protected ScheduledTaskBase(PreciseTimeSpan deadline, TaskCompletionSource promise, CancellationToken cancellationToken) + if (this.InEventLoop) { - this.promise = promise; - this.Deadline = deadline; - this.CancellationToken = cancellationToken; + this.ScheduledTaskQueue.Remove(task); } - - public PreciseTimeSpan Deadline { get; private set; } - - public bool Cancel() + else { - return this.promise.TrySetCanceled(); - } - - public Task Completion - { - get { return this.promise.Task; } - } - - public CancellationToken CancellationToken { get; private set; } - - int IComparable.CompareTo(IScheduledRunnable other) - { - Contract.Requires(other != null); - - return this.Deadline.CompareTo(other.Deadline); - } - - public override IRunnable Value - { - get { return this; } - } - - public void Run() - { - if (this.CancellationToken.IsCancellationRequested) - { - this.promise.TrySetCanceled(); - return; - } - if (this.Completion.IsCanceled) - { - return; - } - try - { - this.Execute(); - this.promise.TryComplete(); - } - catch (Exception ex) - { - // todo: check for fatal - this.promise.TrySetException(ex); - } - } - - protected abstract void Execute(); - } - - sealed class ActionScheduledTask : ScheduledTaskBase - { - readonly Action action; - - public ActionScheduledTask(Action action, PreciseTimeSpan deadline, CancellationToken cancellationToken) - : base(deadline, new TaskCompletionSource(), cancellationToken) - { - this.action = action; - } - - protected override void Execute() - { - this.action(); + this.Execute((e, t) => ((AbstractScheduledEventExecutor)e).ScheduledTaskQueue.Remove((IScheduledRunnable)t), this, task); } } - - sealed class StateActionScheduledTask : ScheduledTaskBase - { - readonly Action action; - - public StateActionScheduledTask(Action action, object state, PreciseTimeSpan deadline, - CancellationToken cancellationToken) - : base(deadline, new TaskCompletionSource(state), cancellationToken) - { - this.action = action; - } - - protected override void Execute() - { - this.action(this.Completion.AsyncState); - } - } - - sealed class StateActionWithContextScheduledTask : ScheduledTaskBase - { - readonly Action action; - readonly object context; - - public StateActionWithContextScheduledTask(Action action, object context, object state, - PreciseTimeSpan deadline, CancellationToken cancellationToken) - : base(deadline, new TaskCompletionSource(state), cancellationToken) - { - this.action = action; - this.context = context; - } - - protected override void Execute() - { - this.action(this.context, this.Completion.AsyncState); - } - } - - #endregion } } \ No newline at end of file diff --git a/src/DotNetty.Common/Concurrency/ActionScheduledAsyncTask.cs b/src/DotNetty.Common/Concurrency/ActionScheduledAsyncTask.cs new file mode 100644 index 0000000..99f854f --- /dev/null +++ b/src/DotNetty.Common/Concurrency/ActionScheduledAsyncTask.cs @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace DotNetty.Common.Concurrency +{ + using System; + using System.Threading; + + sealed class ActionScheduledAsyncTask : ScheduledAsyncTask + { + readonly Action action; + + public ActionScheduledAsyncTask(AbstractScheduledEventExecutor executor, Action action, PreciseTimeSpan deadline, CancellationToken cancellationToken) + : base(executor, deadline, new TaskCompletionSource(), cancellationToken) + { + this.action = action; + } + + protected override void Execute() + { + this.action(); + } + } +} \ No newline at end of file diff --git a/src/DotNetty.Common/Concurrency/ActionScheduledTask.cs b/src/DotNetty.Common/Concurrency/ActionScheduledTask.cs new file mode 100644 index 0000000..e02a1c7 --- /dev/null +++ b/src/DotNetty.Common/Concurrency/ActionScheduledTask.cs @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace DotNetty.Common.Concurrency +{ + using System; + + sealed class ActionScheduledTask : ScheduledTask + { + readonly Action action; + + public ActionScheduledTask(AbstractScheduledEventExecutor executor, Action action, PreciseTimeSpan deadline) + : base(executor, deadline, new TaskCompletionSource()) + { + this.action = action; + } + + protected override void Execute() + { + this.action(); + } + } +} \ No newline at end of file diff --git a/src/DotNetty.Common/Concurrency/IEventExecutor.cs b/src/DotNetty.Common/Concurrency/IEventExecutor.cs index 4216a60..4e4edf3 100644 --- a/src/DotNetty.Common/Concurrency/IEventExecutor.cs +++ b/src/DotNetty.Common/Concurrency/IEventExecutor.cs @@ -105,6 +105,34 @@ namespace DotNetty.Common.Concurrency /// void Execute(Action action, object context, object state); + /// + /// Schedules the given action for execution after the specified delay would pass. + /// + /// + /// Threading specifics are determined by IEventExecutor implementation. + /// + IScheduledTask Schedule(Action action, TimeSpan delay); + + /// + /// Schedules the given action for execution after the specified delay would pass. + /// + /// + /// parameter is useful to when repeated execution of an action against + /// different objects is needed. + /// Threading specifics are determined by IEventExecutor implementation. + /// + IScheduledTask Schedule(Action action, object state, TimeSpan delay); + + /// + /// Schedules the given action for execution after the specified delay would pass. + /// + /// + /// and parameters are useful when repeated execution of + /// an action against different objects in different context is needed. + /// Threading specifics are determined by IEventExecutor implementation. + /// + IScheduledTask Schedule(Action action, object context, object state, TimeSpan delay); + /// /// Schedules the given action for execution after the specified delay would pass. /// diff --git a/src/DotNetty.Common/Concurrency/IScheduledRunnable.cs b/src/DotNetty.Common/Concurrency/IScheduledRunnable.cs new file mode 100644 index 0000000..e33640b --- /dev/null +++ b/src/DotNetty.Common/Concurrency/IScheduledRunnable.cs @@ -0,0 +1,11 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace DotNetty.Common.Concurrency +{ + using System; + + public interface IScheduledRunnable : IRunnable, IScheduledTask, IComparable + { + } +} \ No newline at end of file diff --git a/src/DotNetty.Common/Concurrency/IScheduledTask.cs b/src/DotNetty.Common/Concurrency/IScheduledTask.cs new file mode 100644 index 0000000..82bb4fc --- /dev/null +++ b/src/DotNetty.Common/Concurrency/IScheduledTask.cs @@ -0,0 +1,19 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace DotNetty.Common.Concurrency +{ + using System.Runtime.CompilerServices; + using System.Threading.Tasks; + + public interface IScheduledTask + { + bool Cancel(); + + PreciseTimeSpan Deadline { get; } + + Task Completion { get; } + + TaskAwaiter GetAwaiter(); + } +} \ No newline at end of file diff --git a/src/DotNetty.Common/Concurrency/ScheduledAsyncTask.cs b/src/DotNetty.Common/Concurrency/ScheduledAsyncTask.cs new file mode 100644 index 0000000..11df852 --- /dev/null +++ b/src/DotNetty.Common/Concurrency/ScheduledAsyncTask.cs @@ -0,0 +1,33 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace DotNetty.Common.Concurrency +{ + using System.Threading; + + abstract class ScheduledAsyncTask : ScheduledTask + { + readonly CancellationToken cancellationToken; + CancellationTokenRegistration cancellationTokenRegistration; + + protected ScheduledAsyncTask(AbstractScheduledEventExecutor executor, PreciseTimeSpan deadline, TaskCompletionSource promise, CancellationToken cancellationToken) + : base(executor, deadline, promise) + { + this.cancellationToken = cancellationToken; + this.cancellationTokenRegistration = cancellationToken.Register(s => ((ScheduledAsyncTask)s).Cancel(), this); + } + + public override void Run() + { + this.cancellationTokenRegistration.Dispose(); + if (this.cancellationToken.IsCancellationRequested) + { + this.Promise.TrySetCanceled(); + } + else + { + base.Run(); + } + } + } +} \ No newline at end of file diff --git a/src/DotNetty.Common/Concurrency/ScheduledTask.cs b/src/DotNetty.Common/Concurrency/ScheduledTask.cs new file mode 100644 index 0000000..04ec040 --- /dev/null +++ b/src/DotNetty.Common/Concurrency/ScheduledTask.cs @@ -0,0 +1,110 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace DotNetty.Common.Concurrency +{ + using System; + using System.Diagnostics.Contracts; + using System.Runtime.CompilerServices; + using System.Threading; + using System.Threading.Tasks; + using DotNetty.Common.Utilities; + + abstract class ScheduledTask : MpscLinkedQueueNode, IScheduledRunnable + { + const int CancellationProhibited = 1; + const int CancellationRequested = 1 << 1; + + protected readonly TaskCompletionSource Promise; + protected readonly AbstractScheduledEventExecutor Executor; + int volatileCancellationState; + + protected ScheduledTask(AbstractScheduledEventExecutor executor, PreciseTimeSpan deadline, TaskCompletionSource promise) + { + this.Executor = executor; + this.Promise = promise; + this.Deadline = deadline; + } + + public PreciseTimeSpan Deadline { get; private set; } + + public bool Cancel() + { + if (!this.AtomicCancellationStateUpdate(CancellationProhibited, CancellationRequested)) + { + return false; + } + + bool canceled = this.Promise.TrySetCanceled(); + if (canceled) + { + this.Executor.RemoveScheduled(this); + } + return canceled; + } + + public Task Completion + { + get { return this.Promise.Task; } + } + + public TaskAwaiter GetAwaiter() + { + return this.Completion.GetAwaiter(); + } + + int IComparable.CompareTo(IScheduledRunnable other) + { + Contract.Requires(other != null); + + return this.Deadline.CompareTo(other.Deadline); + } + + public override IRunnable Value + { + get { return this; } + } + + public virtual void Run() + { + if (this.TrySetUncancelable()) + { + try + { + this.Execute(); + this.Promise.TryComplete(); + } + catch (Exception ex) + { + // todo: check for fatal + this.Promise.TrySetException(ex); + } + } + } + + protected abstract void Execute(); + + bool TrySetUncancelable() + { + return this.AtomicCancellationStateUpdate(CancellationProhibited, CancellationRequested); + } + + bool AtomicCancellationStateUpdate(int newBits, int illegalBits) + { + int cancellationState = Volatile.Read(ref this.volatileCancellationState); + int oldCancellationState; + do + { + oldCancellationState = cancellationState; + if ((cancellationState & illegalBits) != 0) + { + return false; + } + cancellationState = Interlocked.CompareExchange(ref this.volatileCancellationState, cancellationState | newBits, cancellationState); + } + while (cancellationState != oldCancellationState); + + return true; + } + } +} \ No newline at end of file diff --git a/src/DotNetty.Common/Concurrency/StateActionScheduledAsyncTask.cs b/src/DotNetty.Common/Concurrency/StateActionScheduledAsyncTask.cs new file mode 100644 index 0000000..84035c0 --- /dev/null +++ b/src/DotNetty.Common/Concurrency/StateActionScheduledAsyncTask.cs @@ -0,0 +1,25 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace DotNetty.Common.Concurrency +{ + using System; + using System.Threading; + + sealed class StateActionScheduledAsyncTask : ScheduledAsyncTask + { + readonly Action action; + + public StateActionScheduledAsyncTask(AbstractScheduledEventExecutor executor, Action action, object state, PreciseTimeSpan deadline, + CancellationToken cancellationToken) + : base(executor, deadline, new TaskCompletionSource(state), cancellationToken) + { + this.action = action; + } + + protected override void Execute() + { + this.action(this.Completion.AsyncState); + } + } +} \ No newline at end of file diff --git a/src/DotNetty.Common/Concurrency/StateActionScheduledTask.cs b/src/DotNetty.Common/Concurrency/StateActionScheduledTask.cs new file mode 100644 index 0000000..187e6b6 --- /dev/null +++ b/src/DotNetty.Common/Concurrency/StateActionScheduledTask.cs @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace DotNetty.Common.Concurrency +{ + using System; + + sealed class StateActionScheduledTask : ScheduledTask + { + readonly Action action; + + public StateActionScheduledTask(AbstractScheduledEventExecutor executor, Action action, object state, PreciseTimeSpan deadline) + : base(executor, deadline, new TaskCompletionSource(state)) + { + this.action = action; + } + + protected override void Execute() + { + this.action(this.Completion.AsyncState); + } + } +} \ No newline at end of file diff --git a/src/DotNetty.Common/Concurrency/StateActionWithContextScheduledAsyncTask.cs b/src/DotNetty.Common/Concurrency/StateActionWithContextScheduledAsyncTask.cs new file mode 100644 index 0000000..609618a --- /dev/null +++ b/src/DotNetty.Common/Concurrency/StateActionWithContextScheduledAsyncTask.cs @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace DotNetty.Common.Concurrency +{ + using System; + using System.Threading; + + sealed class StateActionWithContextScheduledAsyncTask : ScheduledAsyncTask + { + readonly Action action; + readonly object context; + + public StateActionWithContextScheduledAsyncTask(AbstractScheduledEventExecutor executor, Action action, object context, object state, + PreciseTimeSpan deadline, CancellationToken cancellationToken) + : base(executor, deadline, new TaskCompletionSource(state), cancellationToken) + { + this.action = action; + this.context = context; + } + + protected override void Execute() + { + this.action(this.context, this.Completion.AsyncState); + } + } +} \ No newline at end of file diff --git a/src/DotNetty.Common/Concurrency/StateActionWithContextScheduledTask.cs b/src/DotNetty.Common/Concurrency/StateActionWithContextScheduledTask.cs new file mode 100644 index 0000000..c333de7 --- /dev/null +++ b/src/DotNetty.Common/Concurrency/StateActionWithContextScheduledTask.cs @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace DotNetty.Common.Concurrency +{ + using System; + + sealed class StateActionWithContextScheduledTask : ScheduledTask + { + readonly Action action; + readonly object context; + + public StateActionWithContextScheduledTask(AbstractScheduledEventExecutor executor, Action action, object context, object state, + PreciseTimeSpan deadline) + : base(executor, deadline, new TaskCompletionSource(state)) + { + this.action = action; + this.context = context; + } + + protected override void Execute() + { + this.action(this.context, this.Completion.AsyncState); + } + } +} \ No newline at end of file diff --git a/src/DotNetty.Common/DotNetty.Common.csproj b/src/DotNetty.Common/DotNetty.Common.csproj index 3d721d8..17e424f 100644 --- a/src/DotNetty.Common/DotNetty.Common.csproj +++ b/src/DotNetty.Common/DotNetty.Common.csproj @@ -125,7 +125,17 @@ Properties\SharedAssemblyInfo.cs + + + + + + + + + + diff --git a/src/DotNetty.Common/Internal/Logging/InternalLoggerFactory.cs b/src/DotNetty.Common/Internal/Logging/InternalLoggerFactory.cs index 4b7b2aa..e2a1780 100644 --- a/src/DotNetty.Common/Internal/Logging/InternalLoggerFactory.cs +++ b/src/DotNetty.Common/Internal/Logging/InternalLoggerFactory.cs @@ -5,6 +5,8 @@ namespace DotNetty.Common.Internal.Logging { using System; using System.Diagnostics.Contracts; + using System.Runtime.CompilerServices; + using System.Threading; /// /// Creates an or changes the default factory @@ -21,8 +23,7 @@ namespace DotNetty.Common.Internal.Logging /// public abstract class InternalLoggerFactory { - static volatile InternalLoggerFactory defaultFactory = - NewDefaultFactory(typeof(InternalLoggerFactory).FullName); + static InternalLoggerFactory defaultFactory; static InternalLoggerFactory() { @@ -48,12 +49,25 @@ namespace DotNetty.Common.Internal.Logging /// public static InternalLoggerFactory DefaultFactory { - get { return defaultFactory; } + get + { + InternalLoggerFactory factory = Volatile.Read(ref defaultFactory); + if (factory == null) + { + factory = NewDefaultFactory(typeof(InternalLoggerFactory).FullName); + InternalLoggerFactory current = Interlocked.CompareExchange(ref defaultFactory, factory, null); + if (current != null) + { + return current; + } + } + return factory; + } set { Contract.Requires(value != null); - defaultFactory = value; + Volatile.Write(ref defaultFactory, value); } } diff --git a/src/DotNetty.Common/Utilities/PriorityQueue.cs b/src/DotNetty.Common/Utilities/PriorityQueue.cs index 8d77d4b..09ebf62 100644 --- a/src/DotNetty.Common/Utilities/PriorityQueue.cs +++ b/src/DotNetty.Common/Utilities/PriorityQueue.cs @@ -7,7 +7,6 @@ namespace DotNetty.Common.Utilities using System.Collections; using System.Collections.Generic; using System.Diagnostics.Contracts; - using System.Linq; public class PriorityQueue : IEnumerable where T : class @@ -73,6 +72,32 @@ namespace DotNetty.Common.Utilities this.BubbleUp(oldCount, item); } + public void Remove(T item) + { + int index = Array.IndexOf(this.items, item); + if (index == -1) + { + return; + } + + this.count--; + if (index == this.count) + { + this.items[index] = default(T); + } + else + { + T last = this.items[this.count]; + this.items[this.count] = default(T); + this.TrickleDown(index, last); + if (this.items[index] == last) + { + this.BubbleUp(index, last); + } + } + + } + void BubbleUp(int index, T item) { // index > 0 means there is a parent diff --git a/src/DotNetty.Transport/Channels/PausableChannelEventExecutor.cs b/src/DotNetty.Transport/Channels/PausableChannelEventExecutor.cs index e759018..9f52042 100644 --- a/src/DotNetty.Transport/Channels/PausableChannelEventExecutor.cs +++ b/src/DotNetty.Transport/Channels/PausableChannelEventExecutor.cs @@ -54,6 +54,24 @@ namespace DotNetty.Transport.Channels this.Unwrap().Execute(action); } + public IScheduledTask Schedule(Action action, object context, object state, TimeSpan delay) + { + this.VerifyAcceptingNewTasks(); + return this.Unwrap().Schedule(action, context, state, delay); + } + + public IScheduledTask Schedule(Action action, object state, TimeSpan delay) + { + this.VerifyAcceptingNewTasks(); + return this.Unwrap().Schedule(action, state, delay); + } + + public IScheduledTask Schedule(Action action, TimeSpan delay) + { + this.VerifyAcceptingNewTasks(); + return this.Unwrap().Schedule(action, delay); + } + public Task ScheduleAsync(Action action, object state, TimeSpan delay, CancellationToken cancellationToken) { this.VerifyAcceptingNewTasks(); diff --git a/src/DotNetty.Transport/Channels/Sockets/AbstractSocketChannel.cs b/src/DotNetty.Transport/Channels/Sockets/AbstractSocketChannel.cs index badc255..564dd1a 100644 --- a/src/DotNetty.Transport/Channels/Sockets/AbstractSocketChannel.cs +++ b/src/DotNetty.Transport/Channels/Sockets/AbstractSocketChannel.cs @@ -7,7 +7,6 @@ namespace DotNetty.Transport.Channels.Sockets using System.Diagnostics.Contracts; using System.Net; using System.Net.Sockets; - using System.Threading; using System.Threading.Tasks; using DotNetty.Buffers; using DotNetty.Common.Concurrency; @@ -41,7 +40,7 @@ namespace DotNetty.Transport.Channels.Sockets volatile StateFlags state; TaskCompletionSource connectPromise; - CancellationTokenSource connectCancellation; + IScheduledTask connectCancellationTask; protected AbstractSocketChannel(IChannel parent, Socket socket) : base(parent) @@ -223,7 +222,7 @@ namespace DotNetty.Transport.Channels.Sockets get { return (AbstractSocketChannel)this.channel; } } - public override sealed Task ConnectAsync(EndPoint remoteAddress, EndPoint localAddress) + public sealed override Task ConnectAsync(EndPoint remoteAddress, EndPoint localAddress) { // todo: handle cancellation AbstractSocketChannel ch = this.Channel; @@ -253,36 +252,36 @@ namespace DotNetty.Transport.Channels.Sockets TimeSpan connectTimeout = ch.Configuration.ConnectTimeout; if (connectTimeout > TimeSpan.Zero) { - CancellationTokenSource cts = ch.connectCancellation = new CancellationTokenSource(); - - ch.EventLoop.ScheduleAsync( - c => + ch.connectCancellationTask = ch.EventLoop.Schedule( + (c, a) => { // todo: make static / cache delegate?.. var self = (AbstractSocketChannel)c; // todo: call Socket.CancelConnectAsync(...) - TaskCompletionSource promise = ch.connectPromise; - var cause = - new ConnectTimeoutException("connection timed out: " + remoteAddress); + TaskCompletionSource promise = self.connectPromise; + var cause = new ConnectTimeoutException("connection timed out: " + a.ToString()); if (promise != null && promise.TrySetException(cause)) { self.CloseAsync(); } }, this.channel, - connectTimeout, - cts.Token); + remoteAddress, + connectTimeout); } - ch.connectPromise.Task.ContinueWith(t => - { - if (ch.connectCancellation != null) + ch.connectPromise.Task.ContinueWith( + (t, s) => { - ch.connectCancellation.Cancel(); - } - ch.connectPromise = null; - this.channel.CloseAsync(); - }, + var c = (AbstractSocketChannel)s; + if (c.connectCancellationTask != null) + { + c.connectCancellationTask.Cancel(); + } + c.connectPromise = null; + c.CloseAsync(); + }, + ch, TaskContinuationOptions.OnlyOnCanceled | TaskContinuationOptions.ExecuteSynchronously); return ch.connectPromise.Task; @@ -356,9 +355,9 @@ namespace DotNetty.Transport.Channels.Sockets { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 - if (ch.connectCancellation != null) + if (ch.connectCancellationTask != null) { - ch.connectCancellation.Cancel(); + ch.connectCancellationTask.Cancel(); } ch.connectPromise = null; } @@ -461,11 +460,11 @@ namespace DotNetty.Transport.Channels.Sockets this.connectPromise = null; } - CancellationTokenSource cancellation = this.connectCancellation; - if (cancellation != null) + IScheduledTask cancellationTask = this.connectCancellationTask; + if (cancellationTask != null) { - cancellation.Cancel(); - this.connectCancellation = null; + cancellationTask.Cancel(); + this.connectCancellationTask = null; } SocketChannelAsyncOperation readOp = this.readOperation; diff --git a/test/DotNetty.Common.Tests/DotNetty.Common.Tests.csproj b/test/DotNetty.Common.Tests/DotNetty.Common.Tests.csproj index 876f20a..84199d4 100644 --- a/test/DotNetty.Common.Tests/DotNetty.Common.Tests.csproj +++ b/test/DotNetty.Common.Tests/DotNetty.Common.Tests.csproj @@ -83,6 +83,7 @@ + diff --git a/test/DotNetty.Common.Tests/ThreadLocalPoolTest.cs b/test/DotNetty.Common.Tests/ThreadLocalPoolTest.cs index 62d1bcf..7384c51 100644 --- a/test/DotNetty.Common.Tests/ThreadLocalPoolTest.cs +++ b/test/DotNetty.Common.Tests/ThreadLocalPoolTest.cs @@ -1,7 +1,7 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -namespace DotNetty.Buffers.Tests +namespace DotNetty.Common.Tests { using System; using System.Threading.Tasks; diff --git a/test/DotNetty.Common.Tests/Utilities/PriorityQueueTest.cs b/test/DotNetty.Common.Tests/Utilities/PriorityQueueTest.cs new file mode 100644 index 0000000..f03452d --- /dev/null +++ b/test/DotNetty.Common.Tests/Utilities/PriorityQueueTest.cs @@ -0,0 +1,65 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace DotNetty.Common.Tests.Utilities +{ + using System; + using DotNetty.Common.Utilities; + using Xunit; + + public class PriorityQueueTest + { + [Theory] + [InlineData(0, -1)] + [InlineData(1, 0)] + [InlineData(1, -1)] + [InlineData(2, 0)] + [InlineData(2, 1)] + [InlineData(3, 0)] + [InlineData(3, 1)] + [InlineData(3, 2)] + [InlineData(7, 5)] + public void PriorityQueueRemoveTest(int length, int removeIndex) + { + var queue = new PriorityQueue>(); + for (int i = length - 1; i >= 0; i--) + { + queue.Enqueue(Tuple.Create(i)); + } + + if (removeIndex == -1) + { + queue.Remove(Tuple.Create(length)); + Assert.Equal(length, queue.Count); + } + else + { + queue.Remove(Tuple.Create(removeIndex)); + Assert.Equal(length - 1, queue.Count); + } + } + + [Theory] + [InlineData(new[] { 1, 2, 3, 4 }, new[] { 1, 2, 3, 4 })] + [InlineData(new[] { 4, 3, 2, 1 }, new[] { 1, 2, 3, 4 })] + [InlineData(new[] { 3, 2, 1 }, new[] { 1, 2, 3 })] + [InlineData(new[] { 1, 3, 2 }, new[] { 1, 2, 3 })] + [InlineData(new[] { 1, 2 }, new[] { 1, 2 })] + [InlineData(new[] { 2, 1 }, new[] { 1, 2 })] + public void PriorityQueueOrderTest(int[] input, int[] expectedOutput) + { + var queue = new PriorityQueue>(); + foreach (int value in input) + { + queue.Enqueue(Tuple.Create(value)); + } + + for (int index = 0; index < expectedOutput.Length; index++) + { + Tuple item = queue.Dequeue(); + Assert.Equal(expectedOutput[index], item.Item1); + } + Assert.Equal(0, queue.Count); + } + } +} \ No newline at end of file diff --git a/test/DotNetty.Transport.Tests/Channel/Embedded/EmbeddedChannelTest.cs b/test/DotNetty.Transport.Tests/Channel/Embedded/EmbeddedChannelTest.cs index f02d80f..8b46124 100644 --- a/test/DotNetty.Transport.Tests/Channel/Embedded/EmbeddedChannelTest.cs +++ b/test/DotNetty.Transport.Tests/Channel/Embedded/EmbeddedChannelTest.cs @@ -7,6 +7,7 @@ namespace DotNetty.Transport.Tests.Channel.Embedded using System.Threading; using System.Threading.Tasks; using DotNetty.Common; + using DotNetty.Common.Concurrency; using DotNetty.Common.Utilities; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Embedded; @@ -73,6 +74,38 @@ namespace DotNetty.Transport.Tests.Channel.Embedded Assert.True(future.IsCanceled); } + [Fact] + public async Task TestScheduledCancelledDirectly() + { + var ch = new EmbeddedChannel(new ChannelHandlerAdapter()); + + IScheduledTask task1 = ch.EventLoop.Schedule(() => { }, new TimeSpan(1)); + IScheduledTask task2 = ch.EventLoop.Schedule(() => { }, new TimeSpan(1)); + IScheduledTask task3 = ch.EventLoop.Schedule(() => { }, new TimeSpan(1)); + task2.Cancel(); + ch.RunPendingTasks(); + Task checkTask1 = ch.EventLoop.SubmitAsync(() => task1.Completion.IsCompleted); + Task checkTask2 = ch.EventLoop.SubmitAsync(() => task2.Completion.IsCanceled); + Task checkTask3 = ch.EventLoop.SubmitAsync(() => task3.Completion.IsCompleted); + ch.RunPendingTasks(); + ch.CheckException(); + Assert.True(await checkTask1); + Assert.True(await checkTask2); + Assert.True(await checkTask3); + } + + [Fact] + public async Task TestScheduledCancelledAsync() + { + var ch = new EmbeddedChannel(new ChannelHandlerAdapter()); + var cts = new CancellationTokenSource(); + Task task = ch.EventLoop.ScheduleAsync(() => { }, TimeSpan.FromDays(1), cts.Token); + await Task.Run(() => cts.Cancel()); + var checkTask = ch.EventLoop.SubmitAsync(() => task.IsCanceled); + ch.RunPendingTasks(); + Assert.True(await checkTask); + } + class ChannelHandler3 : ChannelHandlerAdapter { readonly CountdownEvent latch;