Adds IEventExecutor.Schedule, proper cancellation of scheduled tasks

Motivation:
Allow for less GC when scheduling if Task-based API is not required. Allow for proper cleanup when canceling scheduled task so that associated resources can be cleaned up early.

Modifications:
- Moved out scheduled task implementations from AbstractScheduledEventExecutor
- Introduced IScheduledTask and aligned IScheduledRunnable
- Implemented removal of scheduled task upon its cancellation
- Extra minor fixes
 - IByteBuffer.ToString(Encoding) honoring ArrayOffset
 - InternalLoggerFactory creating default logging factory lazily

Result:
Scheduled tasks that normally never fire (e.g. timeout handling) do not cause resources to be held up unnecessarily in memory for longer period of time if not necessary.
This commit is contained in:
mgortman 2016-04-04 02:42:54 -07:00
Родитель ae0798eadb
Коммит d5a4c30778
23 изменённых файлов: 615 добавлений и 171 удалений

Просмотреть файл

@ -579,7 +579,7 @@ namespace DotNetty.Buffers
if (src.HasArray)
{
return encoding.GetString(src.Array, readerIndex, len);
return encoding.GetString(src.Array, src.ArrayOffset + readerIndex, len);
}
else
{

Просмотреть файл

@ -55,6 +55,21 @@ namespace DotNetty.Common.Concurrency
this.Execute(new ActionTaskQueueNode(action));
}
public virtual IScheduledTask Schedule(Action action, TimeSpan delay)
{
throw new NotSupportedException();
}
public virtual IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay)
{
throw new NotSupportedException();
}
public virtual IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay)
{
throw new NotSupportedException();
}
public virtual Task ScheduleAsync(Action action, TimeSpan delay)
{
return this.ScheduleAsync(action, delay, CancellationToken.None);

Просмотреть файл

@ -1,4 +1,4 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
@ -15,8 +15,6 @@ namespace DotNetty.Common.Concurrency
/// </summary>
public abstract class AbstractScheduledEventExecutor : AbstractEventExecutor
{
static readonly Action<object, object> AddScheduledTaskAction = (e, t) => ((AbstractScheduledEventExecutor)e).ScheduledTaskQueue.Enqueue((IScheduledRunnable)t);
protected readonly PriorityQueue<IScheduledRunnable> ScheduledTaskQueue = new PriorityQueue<IScheduledRunnable>();
// TODO: support for EventExecutorGroup
@ -94,172 +92,89 @@ namespace DotNetty.Common.Concurrency
return scheduledTask != null && scheduledTask.Deadline <= PreciseTimeSpan.FromStart;
}
public override IScheduledTask Schedule(Action action, TimeSpan delay)
{
return this.Schedule(new ActionScheduledTask(this, action, PreciseTimeSpan.Deadline(delay)));
}
public override IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay)
{
return this.Schedule(new StateActionScheduledTask(this, action, state, PreciseTimeSpan.Deadline(delay)));
}
public override IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay)
{
return this.Schedule(new StateActionWithContextScheduledTask(this, action, context, state, PreciseTimeSpan.Deadline(delay)));
}
public override Task ScheduleAsync(Action action, TimeSpan delay, CancellationToken cancellationToken)
{
var scheduledTask = new ActionScheduledTask(action, PreciseTimeSpan.Deadline(delay), cancellationToken);
if (this.InEventLoop)
if (cancellationToken.IsCancellationRequested)
{
this.ScheduledTaskQueue.Enqueue(scheduledTask);
return TaskEx.Cancelled;
}
else
if (!cancellationToken.CanBeCanceled)
{
this.Execute(AddScheduledTaskAction, this, scheduledTask);
return this.Schedule(action, delay).Completion;
}
return scheduledTask.Completion;
return this.Schedule(new ActionScheduledAsyncTask(this, action, PreciseTimeSpan.Deadline(delay), cancellationToken)).Completion;
}
public override Task ScheduleAsync(Action<object> action, object state, TimeSpan delay, CancellationToken cancellationToken)
{
var scheduledTask = new StateActionScheduledTask(action, state, PreciseTimeSpan.Deadline(delay), cancellationToken);
if (this.InEventLoop)
if (cancellationToken.IsCancellationRequested)
{
this.ScheduledTaskQueue.Enqueue(scheduledTask);
return TaskEx.Cancelled;
}
else
if (!cancellationToken.CanBeCanceled)
{
this.Execute(AddScheduledTaskAction, this, scheduledTask);
return this.Schedule(action, state, delay).Completion;
}
return scheduledTask.Completion;
return this.Schedule(new StateActionScheduledAsyncTask(this, action, state, PreciseTimeSpan.Deadline(delay), cancellationToken)).Completion;
}
public override Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay, CancellationToken cancellationToken)
{
var scheduledTask = new StateActionWithContextScheduledTask(action, context, state, PreciseTimeSpan.Deadline(delay), cancellationToken);
if (cancellationToken.IsCancellationRequested)
{
return TaskEx.Cancelled;
}
if (!cancellationToken.CanBeCanceled)
{
return this.Schedule(action, context, state, delay).Completion;
}
return this.Schedule(new StateActionWithContextScheduledAsyncTask(this, action, context, state, PreciseTimeSpan.Deadline(delay), cancellationToken)).Completion;
}
protected IScheduledRunnable Schedule(IScheduledRunnable task)
{
if (this.InEventLoop)
{
this.ScheduledTaskQueue.Enqueue(scheduledTask);
this.ScheduledTaskQueue.Enqueue(task);
}
else
{
this.Execute(AddScheduledTaskAction, this, scheduledTask);
this.Execute((e, t) => ((AbstractScheduledEventExecutor)e).ScheduledTaskQueue.Enqueue((IScheduledRunnable)t), this, task);
}
return scheduledTask.Completion;
return task;
}
#region Scheduled task data structures
protected interface IScheduledRunnable : IRunnable, IComparable<IScheduledRunnable>
internal void RemoveScheduled(IScheduledRunnable task)
{
PreciseTimeSpan Deadline { get; }
bool Cancel();
}
protected abstract class ScheduledTaskBase : MpscLinkedQueueNode<IRunnable>, IScheduledRunnable
{
readonly TaskCompletionSource promise;
protected ScheduledTaskBase(PreciseTimeSpan deadline, TaskCompletionSource promise, CancellationToken cancellationToken)
if (this.InEventLoop)
{
this.promise = promise;
this.Deadline = deadline;
this.CancellationToken = cancellationToken;
this.ScheduledTaskQueue.Remove(task);
}
public PreciseTimeSpan Deadline { get; private set; }
public bool Cancel()
else
{
return this.promise.TrySetCanceled();
}
public Task Completion
{
get { return this.promise.Task; }
}
public CancellationToken CancellationToken { get; private set; }
int IComparable<IScheduledRunnable>.CompareTo(IScheduledRunnable other)
{
Contract.Requires(other != null);
return this.Deadline.CompareTo(other.Deadline);
}
public override IRunnable Value
{
get { return this; }
}
public void Run()
{
if (this.CancellationToken.IsCancellationRequested)
{
this.promise.TrySetCanceled();
return;
}
if (this.Completion.IsCanceled)
{
return;
}
try
{
this.Execute();
this.promise.TryComplete();
}
catch (Exception ex)
{
// todo: check for fatal
this.promise.TrySetException(ex);
}
}
protected abstract void Execute();
}
sealed class ActionScheduledTask : ScheduledTaskBase
{
readonly Action action;
public ActionScheduledTask(Action action, PreciseTimeSpan deadline, CancellationToken cancellationToken)
: base(deadline, new TaskCompletionSource(), cancellationToken)
{
this.action = action;
}
protected override void Execute()
{
this.action();
this.Execute((e, t) => ((AbstractScheduledEventExecutor)e).ScheduledTaskQueue.Remove((IScheduledRunnable)t), this, task);
}
}
sealed class StateActionScheduledTask : ScheduledTaskBase
{
readonly Action<object> action;
public StateActionScheduledTask(Action<object> action, object state, PreciseTimeSpan deadline,
CancellationToken cancellationToken)
: base(deadline, new TaskCompletionSource(state), cancellationToken)
{
this.action = action;
}
protected override void Execute()
{
this.action(this.Completion.AsyncState);
}
}
sealed class StateActionWithContextScheduledTask : ScheduledTaskBase
{
readonly Action<object, object> action;
readonly object context;
public StateActionWithContextScheduledTask(Action<object, object> action, object context, object state,
PreciseTimeSpan deadline, CancellationToken cancellationToken)
: base(deadline, new TaskCompletionSource(state), cancellationToken)
{
this.action = action;
this.context = context;
}
protected override void Execute()
{
this.action(this.context, this.Completion.AsyncState);
}
}
#endregion
}
}

Просмотреть файл

@ -0,0 +1,24 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
{
using System;
using System.Threading;
sealed class ActionScheduledAsyncTask : ScheduledAsyncTask
{
readonly Action action;
public ActionScheduledAsyncTask(AbstractScheduledEventExecutor executor, Action action, PreciseTimeSpan deadline, CancellationToken cancellationToken)
: base(executor, deadline, new TaskCompletionSource(), cancellationToken)
{
this.action = action;
}
protected override void Execute()
{
this.action();
}
}
}

Просмотреть файл

@ -0,0 +1,23 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
{
using System;
sealed class ActionScheduledTask : ScheduledTask
{
readonly Action action;
public ActionScheduledTask(AbstractScheduledEventExecutor executor, Action action, PreciseTimeSpan deadline)
: base(executor, deadline, new TaskCompletionSource())
{
this.action = action;
}
protected override void Execute()
{
this.action();
}
}
}

Просмотреть файл

@ -105,6 +105,34 @@ namespace DotNetty.Common.Concurrency
/// </remarks>
void Execute(Action<object, object> action, object context, object state);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
IScheduledTask Schedule(Action action, TimeSpan delay);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="state"/> parameter is useful to when repeated execution of an action against
/// different objects is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="context"/> and <paramref name="state"/> parameters are useful when repeated execution of
/// an action against different objects in different context is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>

Просмотреть файл

@ -0,0 +1,11 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
{
using System;
public interface IScheduledRunnable : IRunnable, IScheduledTask, IComparable<IScheduledRunnable>
{
}
}

Просмотреть файл

@ -0,0 +1,19 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
{
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
public interface IScheduledTask
{
bool Cancel();
PreciseTimeSpan Deadline { get; }
Task Completion { get; }
TaskAwaiter GetAwaiter();
}
}

Просмотреть файл

@ -0,0 +1,33 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
{
using System.Threading;
abstract class ScheduledAsyncTask : ScheduledTask
{
readonly CancellationToken cancellationToken;
CancellationTokenRegistration cancellationTokenRegistration;
protected ScheduledAsyncTask(AbstractScheduledEventExecutor executor, PreciseTimeSpan deadline, TaskCompletionSource promise, CancellationToken cancellationToken)
: base(executor, deadline, promise)
{
this.cancellationToken = cancellationToken;
this.cancellationTokenRegistration = cancellationToken.Register(s => ((ScheduledAsyncTask)s).Cancel(), this);
}
public override void Run()
{
this.cancellationTokenRegistration.Dispose();
if (this.cancellationToken.IsCancellationRequested)
{
this.Promise.TrySetCanceled();
}
else
{
base.Run();
}
}
}
}

Просмотреть файл

@ -0,0 +1,110 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
{
using System;
using System.Diagnostics.Contracts;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common.Utilities;
abstract class ScheduledTask : MpscLinkedQueueNode<IRunnable>, IScheduledRunnable
{
const int CancellationProhibited = 1;
const int CancellationRequested = 1 << 1;
protected readonly TaskCompletionSource Promise;
protected readonly AbstractScheduledEventExecutor Executor;
int volatileCancellationState;
protected ScheduledTask(AbstractScheduledEventExecutor executor, PreciseTimeSpan deadline, TaskCompletionSource promise)
{
this.Executor = executor;
this.Promise = promise;
this.Deadline = deadline;
}
public PreciseTimeSpan Deadline { get; private set; }
public bool Cancel()
{
if (!this.AtomicCancellationStateUpdate(CancellationProhibited, CancellationRequested))
{
return false;
}
bool canceled = this.Promise.TrySetCanceled();
if (canceled)
{
this.Executor.RemoveScheduled(this);
}
return canceled;
}
public Task Completion
{
get { return this.Promise.Task; }
}
public TaskAwaiter GetAwaiter()
{
return this.Completion.GetAwaiter();
}
int IComparable<IScheduledRunnable>.CompareTo(IScheduledRunnable other)
{
Contract.Requires(other != null);
return this.Deadline.CompareTo(other.Deadline);
}
public override IRunnable Value
{
get { return this; }
}
public virtual void Run()
{
if (this.TrySetUncancelable())
{
try
{
this.Execute();
this.Promise.TryComplete();
}
catch (Exception ex)
{
// todo: check for fatal
this.Promise.TrySetException(ex);
}
}
}
protected abstract void Execute();
bool TrySetUncancelable()
{
return this.AtomicCancellationStateUpdate(CancellationProhibited, CancellationRequested);
}
bool AtomicCancellationStateUpdate(int newBits, int illegalBits)
{
int cancellationState = Volatile.Read(ref this.volatileCancellationState);
int oldCancellationState;
do
{
oldCancellationState = cancellationState;
if ((cancellationState & illegalBits) != 0)
{
return false;
}
cancellationState = Interlocked.CompareExchange(ref this.volatileCancellationState, cancellationState | newBits, cancellationState);
}
while (cancellationState != oldCancellationState);
return true;
}
}
}

Просмотреть файл

@ -0,0 +1,25 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
{
using System;
using System.Threading;
sealed class StateActionScheduledAsyncTask : ScheduledAsyncTask
{
readonly Action<object> action;
public StateActionScheduledAsyncTask(AbstractScheduledEventExecutor executor, Action<object> action, object state, PreciseTimeSpan deadline,
CancellationToken cancellationToken)
: base(executor, deadline, new TaskCompletionSource(state), cancellationToken)
{
this.action = action;
}
protected override void Execute()
{
this.action(this.Completion.AsyncState);
}
}
}

Просмотреть файл

@ -0,0 +1,23 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
{
using System;
sealed class StateActionScheduledTask : ScheduledTask
{
readonly Action<object> action;
public StateActionScheduledTask(AbstractScheduledEventExecutor executor, Action<object> action, object state, PreciseTimeSpan deadline)
: base(executor, deadline, new TaskCompletionSource(state))
{
this.action = action;
}
protected override void Execute()
{
this.action(this.Completion.AsyncState);
}
}
}

Просмотреть файл

@ -0,0 +1,27 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
{
using System;
using System.Threading;
sealed class StateActionWithContextScheduledAsyncTask : ScheduledAsyncTask
{
readonly Action<object, object> action;
readonly object context;
public StateActionWithContextScheduledAsyncTask(AbstractScheduledEventExecutor executor, Action<object, object> action, object context, object state,
PreciseTimeSpan deadline, CancellationToken cancellationToken)
: base(executor, deadline, new TaskCompletionSource(state), cancellationToken)
{
this.action = action;
this.context = context;
}
protected override void Execute()
{
this.action(this.context, this.Completion.AsyncState);
}
}
}

Просмотреть файл

@ -0,0 +1,26 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
{
using System;
sealed class StateActionWithContextScheduledTask : ScheduledTask
{
readonly Action<object, object> action;
readonly object context;
public StateActionWithContextScheduledTask(AbstractScheduledEventExecutor executor, Action<object, object> action, object context, object state,
PreciseTimeSpan deadline)
: base(executor, deadline, new TaskCompletionSource(state))
{
this.action = action;
this.context = context;
}
protected override void Execute()
{
this.action(this.context, this.Completion.AsyncState);
}
}
}

Просмотреть файл

@ -125,7 +125,17 @@
<Compile Include="..\SharedAssemblyInfo.cs">
<Link>Properties\SharedAssemblyInfo.cs</Link>
</Compile>
<Compile Include="Concurrency\ActionScheduledAsyncTask.cs" />
<Compile Include="Concurrency\ActionScheduledTask.cs" />
<Compile Include="Concurrency\ICallable`T.cs" />
<Compile Include="Concurrency\IScheduledRunnable.cs" />
<Compile Include="Concurrency\IScheduledTask.cs" />
<Compile Include="Concurrency\ScheduledAsyncTask.cs" />
<Compile Include="Concurrency\ScheduledTask.cs" />
<Compile Include="Concurrency\StateActionScheduledAsyncTask.cs" />
<Compile Include="Concurrency\StateActionScheduledTask.cs" />
<Compile Include="Concurrency\StateActionWithContextScheduledAsyncTask.cs" />
<Compile Include="Concurrency\StateActionWithContextScheduledTask.cs" />
<Compile Include="Deque.cs" />
<Compile Include="Properties\Friends.cs" />
<Compile Include="Concurrency\AbstractEventExecutor.cs" />

Просмотреть файл

@ -5,6 +5,8 @@ namespace DotNetty.Common.Internal.Logging
{
using System;
using System.Diagnostics.Contracts;
using System.Runtime.CompilerServices;
using System.Threading;
/// <summary>
/// Creates an <see cref="IInternalLogger"/> or changes the default factory
@ -21,8 +23,7 @@ namespace DotNetty.Common.Internal.Logging
/// </summary>
public abstract class InternalLoggerFactory
{
static volatile InternalLoggerFactory defaultFactory =
NewDefaultFactory(typeof(InternalLoggerFactory).FullName);
static InternalLoggerFactory defaultFactory;
static InternalLoggerFactory()
{
@ -48,12 +49,25 @@ namespace DotNetty.Common.Internal.Logging
/// </summary>
public static InternalLoggerFactory DefaultFactory
{
get { return defaultFactory; }
get
{
InternalLoggerFactory factory = Volatile.Read(ref defaultFactory);
if (factory == null)
{
factory = NewDefaultFactory(typeof(InternalLoggerFactory).FullName);
InternalLoggerFactory current = Interlocked.CompareExchange(ref defaultFactory, factory, null);
if (current != null)
{
return current;
}
}
return factory;
}
set
{
Contract.Requires(value != null);
defaultFactory = value;
Volatile.Write(ref defaultFactory, value);
}
}

Просмотреть файл

@ -7,7 +7,6 @@ namespace DotNetty.Common.Utilities
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Linq;
public class PriorityQueue<T> : IEnumerable<T>
where T : class
@ -73,6 +72,32 @@ namespace DotNetty.Common.Utilities
this.BubbleUp(oldCount, item);
}
public void Remove(T item)
{
int index = Array.IndexOf(this.items, item);
if (index == -1)
{
return;
}
this.count--;
if (index == this.count)
{
this.items[index] = default(T);
}
else
{
T last = this.items[this.count];
this.items[this.count] = default(T);
this.TrickleDown(index, last);
if (this.items[index] == last)
{
this.BubbleUp(index, last);
}
}
}
void BubbleUp(int index, T item)
{
// index > 0 means there is a parent

Просмотреть файл

@ -54,6 +54,24 @@ namespace DotNetty.Transport.Channels
this.Unwrap().Execute(action);
}
public IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().Schedule(action, context, state, delay);
}
public IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().Schedule(action, state, delay);
}
public IScheduledTask Schedule(Action action, TimeSpan delay)
{
this.VerifyAcceptingNewTasks();
return this.Unwrap().Schedule(action, delay);
}
public Task ScheduleAsync(Action<object> action, object state, TimeSpan delay, CancellationToken cancellationToken)
{
this.VerifyAcceptingNewTasks();

Просмотреть файл

@ -7,7 +7,6 @@ namespace DotNetty.Transport.Channels.Sockets
using System.Diagnostics.Contracts;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Buffers;
using DotNetty.Common.Concurrency;
@ -41,7 +40,7 @@ namespace DotNetty.Transport.Channels.Sockets
volatile StateFlags state;
TaskCompletionSource connectPromise;
CancellationTokenSource connectCancellation;
IScheduledTask connectCancellationTask;
protected AbstractSocketChannel(IChannel parent, Socket socket)
: base(parent)
@ -223,7 +222,7 @@ namespace DotNetty.Transport.Channels.Sockets
get { return (AbstractSocketChannel)this.channel; }
}
public override sealed Task ConnectAsync(EndPoint remoteAddress, EndPoint localAddress)
public sealed override Task ConnectAsync(EndPoint remoteAddress, EndPoint localAddress)
{
// todo: handle cancellation
AbstractSocketChannel ch = this.Channel;
@ -253,36 +252,36 @@ namespace DotNetty.Transport.Channels.Sockets
TimeSpan connectTimeout = ch.Configuration.ConnectTimeout;
if (connectTimeout > TimeSpan.Zero)
{
CancellationTokenSource cts = ch.connectCancellation = new CancellationTokenSource();
ch.EventLoop.ScheduleAsync(
c =>
ch.connectCancellationTask = ch.EventLoop.Schedule(
(c, a) =>
{
// todo: make static / cache delegate?..
var self = (AbstractSocketChannel)c;
// todo: call Socket.CancelConnectAsync(...)
TaskCompletionSource promise = ch.connectPromise;
var cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
TaskCompletionSource promise = self.connectPromise;
var cause = new ConnectTimeoutException("connection timed out: " + a.ToString());
if (promise != null && promise.TrySetException(cause))
{
self.CloseAsync();
}
},
this.channel,
connectTimeout,
cts.Token);
remoteAddress,
connectTimeout);
}
ch.connectPromise.Task.ContinueWith(t =>
{
if (ch.connectCancellation != null)
ch.connectPromise.Task.ContinueWith(
(t, s) =>
{
ch.connectCancellation.Cancel();
}
ch.connectPromise = null;
this.channel.CloseAsync();
},
var c = (AbstractSocketChannel)s;
if (c.connectCancellationTask != null)
{
c.connectCancellationTask.Cancel();
}
c.connectPromise = null;
c.CloseAsync();
},
ch,
TaskContinuationOptions.OnlyOnCanceled | TaskContinuationOptions.ExecuteSynchronously);
return ch.connectPromise.Task;
@ -356,9 +355,9 @@ namespace DotNetty.Transport.Channels.Sockets
{
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (ch.connectCancellation != null)
if (ch.connectCancellationTask != null)
{
ch.connectCancellation.Cancel();
ch.connectCancellationTask.Cancel();
}
ch.connectPromise = null;
}
@ -461,11 +460,11 @@ namespace DotNetty.Transport.Channels.Sockets
this.connectPromise = null;
}
CancellationTokenSource cancellation = this.connectCancellation;
if (cancellation != null)
IScheduledTask cancellationTask = this.connectCancellationTask;
if (cancellationTask != null)
{
cancellation.Cancel();
this.connectCancellation = null;
cancellationTask.Cancel();
this.connectCancellationTask = null;
}
SocketChannelAsyncOperation readOp = this.readOperation;

Просмотреть файл

@ -83,6 +83,7 @@
<Compile Include="Internal\Logging\InternalLoggerFactoryTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="ThreadLocalPoolTest.cs" />
<Compile Include="Utilities\PriorityQueueTest.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetty.Common\DotNetty.Common.csproj">

Просмотреть файл

@ -1,7 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Buffers.Tests
namespace DotNetty.Common.Tests
{
using System;
using System.Threading.Tasks;

Просмотреть файл

@ -0,0 +1,65 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Tests.Utilities
{
using System;
using DotNetty.Common.Utilities;
using Xunit;
public class PriorityQueueTest
{
[Theory]
[InlineData(0, -1)]
[InlineData(1, 0)]
[InlineData(1, -1)]
[InlineData(2, 0)]
[InlineData(2, 1)]
[InlineData(3, 0)]
[InlineData(3, 1)]
[InlineData(3, 2)]
[InlineData(7, 5)]
public void PriorityQueueRemoveTest(int length, int removeIndex)
{
var queue = new PriorityQueue<Tuple<int>>();
for (int i = length - 1; i >= 0; i--)
{
queue.Enqueue(Tuple.Create(i));
}
if (removeIndex == -1)
{
queue.Remove(Tuple.Create(length));
Assert.Equal(length, queue.Count);
}
else
{
queue.Remove(Tuple.Create(removeIndex));
Assert.Equal(length - 1, queue.Count);
}
}
[Theory]
[InlineData(new[] { 1, 2, 3, 4 }, new[] { 1, 2, 3, 4 })]
[InlineData(new[] { 4, 3, 2, 1 }, new[] { 1, 2, 3, 4 })]
[InlineData(new[] { 3, 2, 1 }, new[] { 1, 2, 3 })]
[InlineData(new[] { 1, 3, 2 }, new[] { 1, 2, 3 })]
[InlineData(new[] { 1, 2 }, new[] { 1, 2 })]
[InlineData(new[] { 2, 1 }, new[] { 1, 2 })]
public void PriorityQueueOrderTest(int[] input, int[] expectedOutput)
{
var queue = new PriorityQueue<Tuple<int>>();
foreach (int value in input)
{
queue.Enqueue(Tuple.Create(value));
}
for (int index = 0; index < expectedOutput.Length; index++)
{
Tuple<int> item = queue.Dequeue();
Assert.Equal(expectedOutput[index], item.Item1);
}
Assert.Equal(0, queue.Count);
}
}
}

Просмотреть файл

@ -7,6 +7,7 @@ namespace DotNetty.Transport.Tests.Channel.Embedded
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common;
using DotNetty.Common.Concurrency;
using DotNetty.Common.Utilities;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Embedded;
@ -73,6 +74,38 @@ namespace DotNetty.Transport.Tests.Channel.Embedded
Assert.True(future.IsCanceled);
}
[Fact]
public async Task TestScheduledCancelledDirectly()
{
var ch = new EmbeddedChannel(new ChannelHandlerAdapter());
IScheduledTask task1 = ch.EventLoop.Schedule(() => { }, new TimeSpan(1));
IScheduledTask task2 = ch.EventLoop.Schedule(() => { }, new TimeSpan(1));
IScheduledTask task3 = ch.EventLoop.Schedule(() => { }, new TimeSpan(1));
task2.Cancel();
ch.RunPendingTasks();
Task<bool> checkTask1 = ch.EventLoop.SubmitAsync(() => task1.Completion.IsCompleted);
Task<bool> checkTask2 = ch.EventLoop.SubmitAsync(() => task2.Completion.IsCanceled);
Task<bool> checkTask3 = ch.EventLoop.SubmitAsync(() => task3.Completion.IsCompleted);
ch.RunPendingTasks();
ch.CheckException();
Assert.True(await checkTask1);
Assert.True(await checkTask2);
Assert.True(await checkTask3);
}
[Fact]
public async Task TestScheduledCancelledAsync()
{
var ch = new EmbeddedChannel(new ChannelHandlerAdapter());
var cts = new CancellationTokenSource();
Task task = ch.EventLoop.ScheduleAsync(() => { }, TimeSpan.FromDays(1), cts.Token);
await Task.Run(() => cts.Cancel());
var checkTask = ch.EventLoop.SubmitAsync(() => task.IsCanceled);
ch.RunPendingTasks();
Assert.True(await checkTask);
}
class ChannelHandler3 : ChannelHandlerAdapter
{
readonly CountdownEvent latch;