зеркало из https://github.com/Azure/DotNetty.git
new AffinitizedEventLoopGroup; Refining EE/G and EL/G API and docs (#189)
Motivation: In proxy scenarios it is often important to avoid unnecessary switching between event loops while passing traffic from client and server and back. Modifications: - Added AffinitizedEventLoopGroup - introduced IEventExecutor.Parent and IEventLoop.Parent - better xml-docs for executors and event loops. - extra: turned off xml-doc warning Result: Proxy scenario is better supported out of the box.
This commit is contained in:
Родитель
5fb1c99379
Коммит
3767d89a1c
|
@ -19,7 +19,8 @@
|
|||
},
|
||||
"buildOptions": {
|
||||
"keyFile": "../../DotNetty.snk",
|
||||
"xmlDoc": true
|
||||
"xmlDoc": true,
|
||||
"nowarn": [ "CS1591" ]
|
||||
},
|
||||
"dependencies": {
|
||||
"DotNetty.Common": {
|
||||
|
|
|
@ -19,7 +19,8 @@
|
|||
},
|
||||
"buildOptions": {
|
||||
"keyFile": "../../DotNetty.snk",
|
||||
"xmlDoc": true
|
||||
"xmlDoc": true,
|
||||
"nowarn": [ "CS1591" ]
|
||||
},
|
||||
"dependencies": {
|
||||
"DotNetty.Common": {
|
||||
|
|
|
@ -19,7 +19,8 @@
|
|||
},
|
||||
"buildOptions": {
|
||||
"keyFile": "../../DotNetty.snk",
|
||||
"xmlDoc": true
|
||||
"xmlDoc": true,
|
||||
"nowarn": [ "CS1591" ]
|
||||
},
|
||||
"dependencies": {
|
||||
"DotNetty.Common": {
|
||||
|
|
|
@ -19,7 +19,8 @@
|
|||
},
|
||||
"buildOptions": {
|
||||
"keyFile": "../../DotNetty.snk",
|
||||
"xmlDoc": true
|
||||
"xmlDoc": true,
|
||||
"nowarn": [ "CS1591" ]
|
||||
},
|
||||
"dependencies": {
|
||||
"DotNetty.Common": {
|
||||
|
|
|
@ -19,7 +19,8 @@
|
|||
},
|
||||
"buildOptions": {
|
||||
"keyFile": "../../DotNetty.snk",
|
||||
"xmlDoc": true
|
||||
"xmlDoc": true,
|
||||
"nowarn": [ "CS1591" ]
|
||||
},
|
||||
"dependencies": {
|
||||
"DotNetty.Common": {
|
||||
|
|
|
@ -20,7 +20,8 @@
|
|||
"buildOptions": {
|
||||
"allowUnsafe": true,
|
||||
"keyFile": "../../DotNetty.snk",
|
||||
"xmlDoc": true
|
||||
"xmlDoc": true,
|
||||
"nowarn": [ "CS1591" ]
|
||||
},
|
||||
"dependencies": {
|
||||
"DotNetty.Common": {
|
||||
|
|
|
@ -6,81 +6,116 @@ namespace DotNetty.Common.Concurrency
|
|||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Thread = DotNetty.Common.Concurrency.XThread;
|
||||
using Thread = XThread;
|
||||
|
||||
/// <summary>
|
||||
/// Abstract base class for <see cref="IEventExecutor" /> implementations
|
||||
/// </summary>
|
||||
public abstract class AbstractEventExecutor : IEventExecutor
|
||||
{
|
||||
protected static readonly TimeSpan DefaultShutdownQuietPeriod = TimeSpan.FromSeconds(2);
|
||||
protected static readonly TimeSpan DefaultShutdownTimeout = TimeSpan.FromSeconds(15);
|
||||
static readonly TimeSpan DefaultShutdownQuietPeriod = TimeSpan.FromSeconds(2);
|
||||
static readonly TimeSpan DefaultShutdownTimeout = TimeSpan.FromSeconds(15);
|
||||
|
||||
//TODO: support for EventExecutorGroup
|
||||
/// <summary>Creates an instance of <see cref="AbstractEventExecutor"/>.</summary>
|
||||
protected AbstractEventExecutor()
|
||||
: this(null)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>Creates an instance of <see cref="AbstractEventExecutor"/>.</summary>
|
||||
protected AbstractEventExecutor(IEventExecutorGroup parent)
|
||||
{
|
||||
this.Parent = parent;
|
||||
}
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public bool InEventLoop => this.IsInEventLoop(Thread.CurrentThread);
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public abstract bool IsShuttingDown { get; }
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public abstract Task TerminationCompletion { get; }
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public abstract bool IsShutdown { get; }
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public abstract bool IsTerminated { get; }
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public IEventExecutorGroup Parent { get; }
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public abstract bool IsInEventLoop(Thread thread);
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public abstract void Execute(IRunnable task);
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public void Execute(Action<object> action, object state) => this.Execute(new StateActionTaskQueueNode(action, state));
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public void Execute(Action<object, object> action, object context, object state) => this.Execute(new StateActionWithContextTaskQueueNode(action, context, state));
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public void Execute(Action action) => this.Execute(new ActionTaskQueueNode(action));
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public virtual IScheduledTask Schedule(IRunnable action, TimeSpan delay)
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public virtual IScheduledTask Schedule(Action action, TimeSpan delay)
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public virtual IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay)
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public virtual IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay)
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public virtual Task ScheduleAsync(Action action, TimeSpan delay) => this.ScheduleAsync(action, delay, CancellationToken.None);
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public virtual Task ScheduleAsync(Action<object> action, object state, TimeSpan delay, CancellationToken cancellationToken)
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public virtual Task ScheduleAsync(Action<object> action, object state, TimeSpan delay) => this.ScheduleAsync(action, state, delay, CancellationToken.None);
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public virtual Task ScheduleAsync(Action action, TimeSpan delay, CancellationToken cancellationToken)
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public virtual Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay) => this.ScheduleAsync(action, context, state, delay, CancellationToken.None);
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public virtual Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay, CancellationToken cancellationToken)
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public Task<T> SubmitAsync<T>(Func<T> func) => this.SubmitAsync(func, CancellationToken.None);
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public Task<T> SubmitAsync<T>(Func<T> func, CancellationToken cancellationToken)
|
||||
{
|
||||
var node = new FuncSubmitQueueNode<T>(func, cancellationToken);
|
||||
|
@ -88,8 +123,10 @@ namespace DotNetty.Common.Concurrency
|
|||
return node.Completion;
|
||||
}
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public Task<T> SubmitAsync<T>(Func<object, T> func, object state) => this.SubmitAsync(func, state, CancellationToken.None);
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public Task<T> SubmitAsync<T>(Func<object, T> func, object state, CancellationToken cancellationToken)
|
||||
{
|
||||
var node = new StateFuncSubmitQueueNode<T>(func, state, cancellationToken);
|
||||
|
@ -97,8 +134,10 @@ namespace DotNetty.Common.Concurrency
|
|||
return node.Completion;
|
||||
}
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public Task<T> SubmitAsync<T>(Func<object, object, T> func, object context, object state) => this.SubmitAsync(func, context, state, CancellationToken.None);
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public Task<T> SubmitAsync<T>(Func<object, object, T> func, object context, object state, CancellationToken cancellationToken)
|
||||
{
|
||||
var node = new StateFuncWithContextSubmitQueueNode<T>(func, context, state, cancellationToken);
|
||||
|
@ -106,10 +145,13 @@ namespace DotNetty.Common.Concurrency
|
|||
return node.Completion;
|
||||
}
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public Task ShutdownGracefullyAsync() => this.ShutdownGracefullyAsync(DefaultShutdownQuietPeriod, DefaultShutdownTimeout);
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public abstract Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout);
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
protected void SetCurrentExecutor(IEventExecutor executor) => ExecutionEnvironment.SetCurrentExecutor(executor);
|
||||
|
||||
#region Queuing data structures
|
||||
|
|
|
@ -17,7 +17,14 @@ namespace DotNetty.Common.Concurrency
|
|||
{
|
||||
protected readonly PriorityQueue<IScheduledRunnable> ScheduledTaskQueue = new PriorityQueue<IScheduledRunnable>();
|
||||
|
||||
// TODO: support for EventExecutorGroup
|
||||
protected AbstractScheduledEventExecutor()
|
||||
{
|
||||
}
|
||||
|
||||
protected AbstractScheduledEventExecutor(IEventExecutorGroup parent)
|
||||
: base(parent)
|
||||
{
|
||||
}
|
||||
|
||||
protected static PreciseTimeSpan GetNanos() => PreciseTimeSpan.FromStart;
|
||||
|
||||
|
|
|
@ -43,6 +43,11 @@ namespace DotNetty.Common.Concurrency
|
|||
/// </remarks>
|
||||
bool IsTerminated { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Parent <see cref="IEventExecutorGroup"/>.
|
||||
/// </summary>
|
||||
IEventExecutorGroup Parent { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Returns <c>true</c> if the given <see cref="Thread" /> belongs to this event loop,
|
||||
/// <c>false></c> otherwise.
|
||||
|
|
|
@ -6,14 +6,31 @@ namespace DotNetty.Common.Concurrency
|
|||
using System;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
/// <summary>
|
||||
/// Provides an access to a set of <see cref="IEventExecutor"/>s it manages.
|
||||
/// </summary>
|
||||
public interface IEventExecutorGroup
|
||||
{
|
||||
/// <summary>
|
||||
/// A <see cref="Task"/> for completion of termination. <see cref="ShutdownGracefullyAsync()"/>.
|
||||
/// </summary>
|
||||
Task TerminationCompletion { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Returns <see cref="IEventExecutor"/>.
|
||||
/// </summary>
|
||||
IEventExecutor GetNext();
|
||||
|
||||
/// <summary>
|
||||
/// Terminates this <see cref="IEventExecutorGroup"/> and all its <see cref="IEventExecutor"/>s.
|
||||
/// </summary>
|
||||
/// <returns><see cref="Task"/> for completion of termination.</returns>
|
||||
Task ShutdownGracefullyAsync();
|
||||
|
||||
/// <summary>
|
||||
/// Terminates this <see cref="IEventExecutorGroup"/> and all its <see cref="IEventExecutor"/>s.
|
||||
/// </summary>
|
||||
/// <returns><see cref="Task"/> for completion of termination.</returns>
|
||||
Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout);
|
||||
}
|
||||
}
|
|
@ -9,8 +9,11 @@ namespace DotNetty.Common.Concurrency
|
|||
using System.Threading.Tasks;
|
||||
using DotNetty.Common.Internal;
|
||||
using DotNetty.Common.Internal.Logging;
|
||||
using Thread = DotNetty.Common.Concurrency.XThread;
|
||||
using Thread = XThread;
|
||||
|
||||
/// <summary>
|
||||
/// <see cref="IEventExecutor"/> backed by a single thread.
|
||||
/// </summary>
|
||||
public class SingleThreadEventExecutor : AbstractScheduledEventExecutor
|
||||
{
|
||||
#pragma warning disable 420 // referencing volatile fields is fine in Interlocked methods
|
||||
|
@ -39,12 +42,24 @@ namespace DotNetty.Common.Concurrency
|
|||
PreciseTimeSpan gracefulShutdownQuietPeriod;
|
||||
PreciseTimeSpan gracefulShutdownTimeout;
|
||||
|
||||
/// <summary>Creates a new instance of <see cref="SingleThreadEventExecutor"/>.</summary>
|
||||
public SingleThreadEventExecutor(string threadName, TimeSpan breakoutInterval)
|
||||
: this(threadName, breakoutInterval, new CompatibleConcurrentQueue<IRunnable>())
|
||||
: this(null, threadName, breakoutInterval, new CompatibleConcurrentQueue<IRunnable>())
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>Creates a new instance of <see cref="SingleThreadEventExecutor"/>.</summary>
|
||||
public SingleThreadEventExecutor(IEventExecutorGroup parent, string threadName, TimeSpan breakoutInterval)
|
||||
: this(parent, threadName, breakoutInterval, new CompatibleConcurrentQueue<IRunnable>())
|
||||
{
|
||||
}
|
||||
|
||||
protected SingleThreadEventExecutor(string threadName, TimeSpan breakoutInterval, IQueue<IRunnable> taskQueue)
|
||||
: this(null, threadName, breakoutInterval, taskQueue)
|
||||
{ }
|
||||
|
||||
protected SingleThreadEventExecutor(IEventExecutorGroup parent, string threadName, TimeSpan breakoutInterval, IQueue<IRunnable> taskQueue)
|
||||
: base(parent)
|
||||
{
|
||||
this.terminationCompletionSource = new TaskCompletionSource();
|
||||
this.taskQueue = taskQueue;
|
||||
|
@ -86,16 +101,22 @@ namespace DotNetty.Common.Concurrency
|
|||
this.scheduler);
|
||||
}
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public override bool IsShuttingDown => this.executionState >= ST_SHUTTING_DOWN;
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public override Task TerminationCompletion => this.terminationCompletionSource.Task;
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public override bool IsShutdown => this.executionState >= ST_SHUTDOWN;
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public override bool IsTerminated => this.executionState == ST_TERMINATED;
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public override bool IsInEventLoop(Thread t) => this.thread == t;
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public override void Execute(IRunnable task)
|
||||
{
|
||||
this.taskQueue.TryEnqueue(task);
|
||||
|
@ -114,6 +135,7 @@ namespace DotNetty.Common.Concurrency
|
|||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc cref="IEventExecutor"/>
|
||||
public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
|
||||
{
|
||||
Contract.Requires(quietPeriod >= TimeSpan.Zero);
|
||||
|
|
|
@ -19,7 +19,8 @@
|
|||
},
|
||||
"buildOptions": {
|
||||
"keyFile": "../../DotNetty.snk",
|
||||
"xmlDoc": true
|
||||
"xmlDoc": true,
|
||||
"nowarn": [ "CS1591" ]
|
||||
},
|
||||
"dependencies": {
|
||||
"Microsoft.Extensions.Logging": "1.0.0"
|
||||
|
|
|
@ -19,7 +19,8 @@
|
|||
},
|
||||
"buildOptions": {
|
||||
"keyFile": "../../DotNetty.snk",
|
||||
"xmlDoc": true
|
||||
"xmlDoc": true,
|
||||
"nowarn": [ "CS1591" ]
|
||||
},
|
||||
"dependencies": {
|
||||
"DotNetty.Common": {
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
namespace DotNetty.Transport.Channels
|
||||
{
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using DotNetty.Common.Concurrency;
|
||||
|
||||
/// <summary>
|
||||
/// <see cref="IEventLoopGroup"/> that works as a wrapper for another <see cref="IEventLoopGroup"/> providing affinity on <see cref="GetNext"/> call.
|
||||
/// </summary>
|
||||
public class AffinitizedEventLoopGroup : IEventLoopGroup
|
||||
{
|
||||
readonly IEventLoopGroup innerGroup;
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new instance of <see cref="AffinitizedEventLoopGroup"/>.
|
||||
/// </summary>
|
||||
/// <param name="innerGroup"><see cref="IEventLoopGroup"/> serving as an actual provider of <see cref="IEventLoop"/>s.</param>
|
||||
public AffinitizedEventLoopGroup(IEventLoopGroup innerGroup)
|
||||
{
|
||||
this.innerGroup = innerGroup;
|
||||
}
|
||||
|
||||
/// <inheritdoc cref="IEventLoopGroup"/>
|
||||
public Task TerminationCompletion => this.innerGroup.TerminationCompletion;
|
||||
|
||||
IEventExecutor IEventExecutorGroup.GetNext() => this.GetNext();
|
||||
|
||||
/// <summary>
|
||||
/// If running in a context of an existing <see cref="IEventLoop"/>, this <see cref="IEventLoop"/> is returned.
|
||||
/// Otherwise, <see cref="IEventLoop"/> is retrieved from underlying <see cref="IEventLoopGroup"/>.
|
||||
/// </summary>
|
||||
public IEventLoop GetNext()
|
||||
{
|
||||
IEventExecutor executor;
|
||||
if (ExecutionEnvironment.TryGetCurrentExecutor(out executor))
|
||||
{
|
||||
var loop = executor as IEventLoop;
|
||||
if (loop != null && loop.Parent == this.innerGroup)
|
||||
{
|
||||
return loop;
|
||||
}
|
||||
}
|
||||
return this.innerGroup.GetNext();
|
||||
}
|
||||
|
||||
/// <inheritdoc cref="IEventLoopGroup"/>
|
||||
public Task ShutdownGracefullyAsync() => this.innerGroup.ShutdownGracefullyAsync();
|
||||
|
||||
/// <inheritdoc cref="IEventLoopGroup"/>
|
||||
public Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout) => this.innerGroup.ShutdownGracefullyAsync(quietPeriod, timeout);
|
||||
}
|
||||
}
|
|
@ -5,7 +5,6 @@ namespace DotNetty.Transport.Channels.Embedded
|
|||
{
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using DotNetty.Common;
|
||||
using DotNetty.Common.Concurrency;
|
||||
|
@ -30,6 +29,8 @@ namespace DotNetty.Transport.Channels.Embedded
|
|||
|
||||
public override bool IsTerminated => false;
|
||||
|
||||
public new IEventLoopGroup Parent => (IEventLoopGroup)base.Parent;
|
||||
|
||||
public override bool IsInEventLoop(Thread thread) => true;
|
||||
|
||||
public override void Execute(IRunnable command)
|
||||
|
|
|
@ -6,8 +6,21 @@ namespace DotNetty.Transport.Channels
|
|||
using System.Threading.Tasks;
|
||||
using DotNetty.Common.Concurrency;
|
||||
|
||||
/// <summary>
|
||||
/// <see cref="IEventExecutor"/> specialized to handle I/O operations of assigned <see cref="IChannel"/>s.
|
||||
/// </summary>
|
||||
public interface IEventLoop : IEventExecutor
|
||||
{
|
||||
/// <summary>
|
||||
/// Parent <see cref="IEventLoopGroup"/>.
|
||||
/// </summary>
|
||||
new IEventLoopGroup Parent { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Registers provided <see cref="IChannel"/> with this <see cref="IEventLoop"/>.
|
||||
/// </summary>
|
||||
/// <param name="channel"><see cref="IChannel"/> to register.</param>
|
||||
/// <returns><see cref="Task"/> for completion of registration.</returns>
|
||||
Task RegisterAsync(IChannel channel);
|
||||
}
|
||||
}
|
|
@ -5,8 +5,14 @@ namespace DotNetty.Transport.Channels
|
|||
{
|
||||
using DotNetty.Common.Concurrency;
|
||||
|
||||
/// <summary>
|
||||
/// <see cref="IEventExecutorGroup"/> specialized for handling <see cref="IEventLoop"/>s.
|
||||
/// </summary>
|
||||
public interface IEventLoopGroup : IEventExecutorGroup
|
||||
{
|
||||
/// <summary>
|
||||
/// Returns <see cref="IEventLoop"/>.
|
||||
/// </summary>
|
||||
new IEventLoop GetNext();
|
||||
}
|
||||
}
|
|
@ -9,30 +9,37 @@ namespace DotNetty.Transport.Channels
|
|||
using System.Threading.Tasks;
|
||||
using DotNetty.Common.Concurrency;
|
||||
|
||||
/// <summary>
|
||||
/// <see cref="IEventLoopGroup"/> backed by a set of <see cref="SingleThreadEventLoop"/> instances.
|
||||
/// </summary>
|
||||
public sealed class MultithreadEventLoopGroup : IEventLoopGroup
|
||||
{
|
||||
static readonly int DefaultEventLoopThreadCount = Environment.ProcessorCount * 2;
|
||||
static readonly Func<IEventLoop> DefaultEventLoopFactory = () => new SingleThreadEventLoop();
|
||||
static readonly Func<IEventLoopGroup, IEventLoop> DefaultEventLoopFactory = group => new SingleThreadEventLoop(group);
|
||||
|
||||
readonly IEventLoop[] eventLoops;
|
||||
int requestId;
|
||||
|
||||
/// <summary>Creates a new instance of <see cref="MultithreadEventLoopGroup"/>.</summary>
|
||||
public MultithreadEventLoopGroup()
|
||||
: this(DefaultEventLoopFactory, DefaultEventLoopThreadCount)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>Creates a new instance of <see cref="MultithreadEventLoopGroup"/>.</summary>
|
||||
public MultithreadEventLoopGroup(int eventLoopCount)
|
||||
: this(DefaultEventLoopFactory, eventLoopCount)
|
||||
{
|
||||
}
|
||||
|
||||
public MultithreadEventLoopGroup(Func<IEventLoop> eventLoopFactory)
|
||||
/// <summary>Creates a new instance of <see cref="MultithreadEventLoopGroup"/>.</summary>
|
||||
public MultithreadEventLoopGroup(Func<IEventLoopGroup, IEventLoop> eventLoopFactory)
|
||||
: this(eventLoopFactory, DefaultEventLoopThreadCount)
|
||||
{
|
||||
}
|
||||
|
||||
public MultithreadEventLoopGroup(Func<IEventLoop> eventLoopFactory, int eventLoopCount)
|
||||
/// <summary>Creates a new instance of <see cref="MultithreadEventLoopGroup"/>.</summary>
|
||||
public MultithreadEventLoopGroup(Func<IEventLoopGroup, IEventLoop> eventLoopFactory, int eventLoopCount)
|
||||
{
|
||||
this.eventLoops = new IEventLoop[eventLoopCount];
|
||||
var terminationTasks = new Task[eventLoopCount];
|
||||
|
@ -42,7 +49,7 @@ namespace DotNetty.Transport.Channels
|
|||
bool success = false;
|
||||
try
|
||||
{
|
||||
eventLoop = eventLoopFactory();
|
||||
eventLoop = eventLoopFactory(this);
|
||||
success = true;
|
||||
}
|
||||
catch (Exception ex)
|
||||
|
@ -54,8 +61,8 @@ namespace DotNetty.Transport.Channels
|
|||
if (!success)
|
||||
{
|
||||
Task.WhenAll(this.eventLoops
|
||||
.Take(i)
|
||||
.Select(loop => loop.ShutdownGracefullyAsync()))
|
||||
.Take(i)
|
||||
.Select(loop => loop.ShutdownGracefullyAsync()))
|
||||
.Wait();
|
||||
}
|
||||
}
|
||||
|
@ -66,16 +73,20 @@ namespace DotNetty.Transport.Channels
|
|||
this.TerminationCompletion = Task.WhenAll(terminationTasks);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task TerminationCompletion { get; }
|
||||
|
||||
/// <inheritdoc />
|
||||
public IEventLoop GetNext()
|
||||
{
|
||||
int id = Interlocked.Increment(ref this.requestId);
|
||||
return this.eventLoops[Math.Abs(id % this.eventLoops.Length)];
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
IEventExecutor IEventExecutorGroup.GetNext() => this.GetNext();
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task ShutdownGracefullyAsync()
|
||||
{
|
||||
foreach (IEventLoop eventLoop in this.eventLoops)
|
||||
|
@ -85,6 +96,7 @@ namespace DotNetty.Transport.Channels
|
|||
return this.TerminationCompletion;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
|
||||
{
|
||||
foreach (IEventLoop eventLoop in this.eventLoops)
|
||||
|
|
|
@ -8,30 +8,65 @@ namespace DotNetty.Transport.Channels
|
|||
using DotNetty.Common.Concurrency;
|
||||
using DotNetty.Common.Internal;
|
||||
|
||||
/// <summary>
|
||||
/// <see cref="IEventLoop"/> implementation based on <see cref="SingleThreadEventExecutor"/>.
|
||||
/// </summary>
|
||||
public class SingleThreadEventLoop : SingleThreadEventExecutor, IEventLoop
|
||||
{
|
||||
static readonly TimeSpan DefaultBreakoutInterval = TimeSpan.FromMilliseconds(100);
|
||||
|
||||
/// <summary>Creates a new instance of <see cref="SingleThreadEventLoop"/>.</summary>
|
||||
public SingleThreadEventLoop()
|
||||
: this(null, DefaultBreakoutInterval)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>Creates a new instance of <see cref="SingleThreadEventLoop"/>.</summary>
|
||||
public SingleThreadEventLoop(string threadName)
|
||||
: this(threadName, DefaultBreakoutInterval)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>Creates a new instance of <see cref="SingleThreadEventLoop"/>.</summary>
|
||||
public SingleThreadEventLoop(string threadName, TimeSpan breakoutInterval)
|
||||
: base(threadName, breakoutInterval)
|
||||
{
|
||||
}
|
||||
|
||||
protected SingleThreadEventLoop(string threadName, TimeSpan breakoutInterval, IQueue<IRunnable> taskQueue)
|
||||
: base(threadName, breakoutInterval, taskQueue)
|
||||
/// <summary>Creates a new instance of <see cref="SingleThreadEventLoop"/>.</summary>
|
||||
public SingleThreadEventLoop(IEventLoopGroup parent)
|
||||
: this(parent, null, DefaultBreakoutInterval)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>Creates a new instance of <see cref="SingleThreadEventLoop"/>.</summary>
|
||||
public SingleThreadEventLoop(IEventLoopGroup parent, string threadName)
|
||||
: this(parent, threadName, DefaultBreakoutInterval)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>Creates a new instance of <see cref="SingleThreadEventLoop"/>.</summary>
|
||||
public SingleThreadEventLoop(IEventLoopGroup parent, string threadName, TimeSpan breakoutInterval)
|
||||
: base(parent, threadName, breakoutInterval)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>Creates a new instance of <see cref="SingleThreadEventLoop"/>.</summary>
|
||||
protected SingleThreadEventLoop(string threadName, TimeSpan breakoutInterval, IQueue<IRunnable> taskQueue)
|
||||
: base(null, threadName, breakoutInterval, taskQueue)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>Creates a new instance of <see cref="SingleThreadEventLoop"/>.</summary>
|
||||
protected SingleThreadEventLoop(IEventLoopGroup parent, string threadName, TimeSpan breakoutInterval, IQueue<IRunnable> taskQueue)
|
||||
: base(parent, threadName, breakoutInterval, taskQueue)
|
||||
{
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task RegisterAsync(IChannel channel) => channel.Unsafe.RegisterAsync(this);
|
||||
|
||||
/// <inheritdoc />
|
||||
public new IEventLoopGroup Parent => (IEventLoopGroup)base.Parent;
|
||||
}
|
||||
}
|
|
@ -19,7 +19,8 @@
|
|||
},
|
||||
"buildOptions": {
|
||||
"keyFile": "../../DotNetty.snk",
|
||||
"xmlDoc": true
|
||||
"xmlDoc": true,
|
||||
"nowarn": ["CS1591"]
|
||||
},
|
||||
"dependencies": {
|
||||
"DotNetty.Common": {
|
||||
|
|
Загрузка…
Ссылка в новой задаче