Switch to ConcurrentQueue in STEE by default (#160)

Motivation:
as netty moved on to use JCTools directly, it is worth revisiting standard .NET concurrent collection and open queue type election in executor.

Modifications:
- Removed MpscLinkedQueue as redundant (perf is no better than with ConcurrentQueue)
- Introduced CompatibleConcurrentQueue to bridge ConcurrentQueue and IQueue<T> interface, made it the default.
- Updated STEE execution benchmark

Result:
Cleaner solution, less GC pressure from executor actions, ability to specify specialized task queue type (e.g. MpscArrayQueue on server for efficiency (at a cost of extra-mem consumption).
This commit is contained in:
Max Gortman 2016-09-07 16:39:03 -07:00 коммит произвёл GitHub
Родитель 689dccf086
Коммит 7aef9c7500
24 изменённых файлов: 227 добавлений и 458 удалений

Просмотреть файл

@ -371,8 +371,8 @@ namespace DotNetty.Buffers
public bool Allocate(PooledByteBuffer<T> buf, int reqCapacity)
{
Entry entry = this.queue.Dequeue();
if (entry == null)
Entry entry;
if (!this.queue.TryDequeue(out entry))
{
return false;
}
@ -395,8 +395,8 @@ namespace DotNetty.Buffers
int numFreed = 0;
for (; numFreed < max; numFreed++)
{
Entry entry = this.queue.Dequeue();
if (entry != null)
Entry entry;
if (this.queue.TryDequeue(out entry))
{
this.FreeEntry(entry);
}

Просмотреть файл

@ -6,7 +6,6 @@ namespace DotNetty.Common.Concurrency
using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common.Utilities;
/// <summary>
/// Abstract base class for <see cref="IEventExecutor" /> implementations
@ -114,14 +113,7 @@ namespace DotNetty.Common.Concurrency
#region Queuing data structures
protected abstract class RunnableQueueNode : MpscLinkedQueueNode<IRunnable>, IRunnable
{
public abstract void Run();
public override IRunnable Value => this;
}
sealed class ActionTaskQueueNode : RunnableQueueNode
sealed class ActionTaskQueueNode : IRunnable
{
readonly Action action;
@ -130,10 +122,10 @@ namespace DotNetty.Common.Concurrency
this.action = action;
}
public override void Run() => this.action();
public void Run() => this.action();
}
sealed class StateActionTaskQueueNode : RunnableQueueNode
sealed class StateActionTaskQueueNode : IRunnable
{
readonly Action<object> action;
readonly object state;
@ -144,10 +136,10 @@ namespace DotNetty.Common.Concurrency
this.state = state;
}
public override void Run() => this.action(this.state);
public void Run() => this.action(this.state);
}
sealed class StateActionWithContextTaskQueueNode : RunnableQueueNode
sealed class StateActionWithContextTaskQueueNode : IRunnable
{
readonly Action<object, object> action;
readonly object context;
@ -160,10 +152,10 @@ namespace DotNetty.Common.Concurrency
this.state = state;
}
public override void Run() => this.action(this.context, this.state);
public void Run() => this.action(this.context, this.state);
}
abstract class FuncQueueNodeBase<T> : RunnableQueueNode
abstract class FuncQueueNodeBase<T> : IRunnable
{
readonly TaskCompletionSource<T> promise;
readonly CancellationToken cancellationToken;
@ -176,7 +168,7 @@ namespace DotNetty.Common.Concurrency
public Task<T> Completion => this.promise.Task;
public override void Run()
public void Run()
{
if (this.cancellationToken.IsCancellationRequested)
{

Просмотреть файл

@ -5,7 +5,6 @@ namespace DotNetty.Common.Concurrency
{
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetty.Common.Utilities;
public sealed class ExecutorTaskScheduler : TaskScheduler
{
@ -46,7 +45,7 @@ namespace DotNetty.Common.Concurrency
protected override bool TryDequeue(Task task) => false;
sealed class TaskQueueNode : MpscLinkedQueueNode<IRunnable>, IRunnable
sealed class TaskQueueNode : IRunnable
{
readonly ExecutorTaskScheduler scheduler;
readonly Task task;
@ -57,8 +56,6 @@ namespace DotNetty.Common.Concurrency
this.task = task;
}
public override IRunnable Value => this;
public void Run() => this.scheduler.TryExecuteTask(this.task);
}
}

Просмотреть файл

@ -8,9 +8,8 @@ namespace DotNetty.Common.Concurrency
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common.Utilities;
abstract class ScheduledTask : MpscLinkedQueueNode<IRunnable>, IScheduledRunnable
abstract class ScheduledTask : IScheduledRunnable
{
const int CancellationProhibited = 1;
const int CancellationRequested = 1 << 1;
@ -54,8 +53,6 @@ namespace DotNetty.Common.Concurrency
return this.Deadline.CompareTo(other.Deadline);
}
public override IRunnable Value => this;
public virtual void Run()
{
if (this.TrySetUncancelable())

Просмотреть файл

@ -7,8 +7,8 @@ namespace DotNetty.Common.Concurrency
using System.Diagnostics.Contracts;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common.Internal;
using DotNetty.Common.Internal.Logging;
using DotNetty.Common.Utilities;
public class SingleThreadEventExecutor : AbstractScheduledEventExecutor
{
@ -26,7 +26,7 @@ namespace DotNetty.Common.Concurrency
static readonly IInternalLogger Logger =
InternalLoggerFactory.GetInstance<SingleThreadEventExecutor>();
readonly MpscLinkedQueue<IRunnable> taskQueue = new MpscLinkedQueue<IRunnable>();
readonly IQueue<IRunnable> taskQueue;
readonly Thread thread;
volatile int executionState = ST_NOT_STARTED;
readonly PreciseTimeSpan preciseBreakoutInterval;
@ -39,8 +39,14 @@ namespace DotNetty.Common.Concurrency
PreciseTimeSpan gracefulShutdownTimeout;
public SingleThreadEventExecutor(string threadName, TimeSpan breakoutInterval)
: this(threadName, breakoutInterval, new CompatibleConcurrentQueue<IRunnable>())
{
}
protected SingleThreadEventExecutor(string threadName, TimeSpan breakoutInterval, IQueue<IRunnable> taskQueue)
{
this.terminationCompletionSource = new TaskCompletionSource();
this.taskQueue = taskQueue;
this.preciseBreakoutInterval = PreciseTimeSpan.FromTimeSpan(breakoutInterval);
this.scheduler = new ExecutorTaskScheduler(this);
this.thread = new Thread(this.Loop)
@ -104,7 +110,7 @@ namespace DotNetty.Common.Concurrency
protected void WakeUp(bool inEventLoop)
{
if (!inEventLoop || this.executionState == ST_SHUTTING_DOWN)
if (!inEventLoop || (this.executionState == ST_SHUTTING_DOWN))
{
this.Execute(WAKEUP_TASK);
}
@ -203,7 +209,7 @@ namespace DotNetty.Common.Concurrency
PreciseTimeSpan nanoTime = PreciseTimeSpan.FromStart;
if (this.IsShutdown || nanoTime - this.gracefulShutdownStartTime > this.gracefulShutdownTimeout)
if (this.IsShutdown || (nanoTime - this.gracefulShutdownStartTime > this.gracefulShutdownTimeout))
{
return true;
}
@ -229,18 +235,18 @@ namespace DotNetty.Common.Concurrency
while (true)
{
int oldState = this.executionState;
if (oldState >= ST_SHUTTING_DOWN || Interlocked.CompareExchange(ref this.executionState, ST_SHUTTING_DOWN, oldState) == oldState)
if ((oldState >= ST_SHUTTING_DOWN) || (Interlocked.CompareExchange(ref this.executionState, ST_SHUTTING_DOWN, oldState) == oldState))
{
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && this.gracefulShutdownStartTime == PreciseTimeSpan.Zero)
if (success && (this.gracefulShutdownStartTime == PreciseTimeSpan.Zero))
{
Logger.Error(
$"Buggy {typeof(IEventExecutor).Name} implementation; {typeof(SingleThreadEventExecutor).Name}.ConfirmShutdown() must be called "
+ "before run() implementation terminates.");
+ "before run() implementation terminates.");
}
try
@ -356,33 +362,32 @@ namespace DotNetty.Common.Concurrency
return true;
}
void FetchFromScheduledTaskQueue()
bool FetchFromScheduledTaskQueue()
{
if (this.HasScheduledTasks())
PreciseTimeSpan nanoTime = PreciseTimeSpan.FromStart;
IScheduledRunnable scheduledTask = this.PollScheduledTask(nanoTime);
while (scheduledTask != null)
{
PreciseTimeSpan nanoTime = PreciseTimeSpan.FromStart;
while (true)
if (!this.taskQueue.TryEnqueue(scheduledTask))
{
IScheduledRunnable scheduledTask = this.PollScheduledTask(nanoTime);
if (scheduledTask == null)
{
break;
}
this.taskQueue.TryEnqueue(scheduledTask);
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
this.ScheduledTaskQueue.Enqueue(scheduledTask);
return false;
}
scheduledTask = this.PollScheduledTask(nanoTime);
}
return true;
}
IRunnable PollTask()
{
Contract.Assert(this.InEventLoop);
IRunnable task = this.taskQueue.Dequeue();
if (task == null)
IRunnable task;
if (!this.taskQueue.TryDequeue(out task))
{
this.emptyEvent.Reset();
if ((task = this.taskQueue.Dequeue()) == null && !this.IsShuttingDown) // revisit queue as producer might have put a task in meanwhile
if (!this.taskQueue.TryDequeue(out task) && !this.IsShuttingDown) // revisit queue as producer might have put a task in meanwhile
{
IScheduledRunnable nextScheduledTask = this.ScheduledTaskQueue.Peek();
if (nextScheduledTask != null)
@ -393,14 +398,14 @@ namespace DotNetty.Common.Concurrency
if (this.emptyEvent.Wait(wakeupTimeout.ToTimeSpan()))
{
// woken up before the next scheduled task was due
task = this.taskQueue.Dequeue();
this.taskQueue.TryDequeue(out task);
}
}
}
else
{
this.emptyEvent.Wait();
task = this.taskQueue.Dequeue();
this.taskQueue.TryDequeue(out task);
}
}
}

Просмотреть файл

@ -169,11 +169,12 @@
<Compile Include="FastThreadLocal.cs" />
<Compile Include="InternalThreadLocalMap.cs" />
<Compile Include="Internal\AbstractQueue.cs" />
<Compile Include="Internal\CompatibleConcurrentQueue.cs" />
<Compile Include="Internal\IQueue.cs" />
<Compile Include="Internal\ConcurrentCircularArrayQueue.cs" />
<Compile Include="Internal\MpscArrayQueue.cs" />
<Compile Include="Internal\OneTimeTask.cs" />
<Compile Include="Internal\PlatformDependent.cs" />
<Compile Include="Internal\RefArrayAccessUtil.cs" />
<Compile Include="Properties\Friends.cs" />
<Compile Include="Concurrency\AbstractEventExecutor.cs" />
<Compile Include="Concurrency\AbstractScheduledEventExecutor.cs" />
@ -212,10 +213,8 @@
<Compile Include="Utilities\IAttributeMap.cs" />
<Compile Include="Utilities\IConstant.cs" />
<Compile Include="Utilities\IntegerExtensions.cs" />
<Compile Include="Utilities\MpscLinkedQueue.cs" />
<Compile Include="Utilities\PriorityQueue.cs" />
<Compile Include="Utilities\RandomExtensions.cs" />
<Compile Include="Utilities\RecyclableMpscLinkedQueueNode.cs" />
<Compile Include="Utilities\ReferenceCountUtil.cs" />
<Compile Include="Internal\SystemPropertyUtil.cs" />
<Compile Include="Utilities\ReferenceEqualityComparer.cs" />

Просмотреть файл

@ -5,11 +5,11 @@ namespace DotNetty.Common.Internal
{
public abstract class AbstractQueue<T> : IQueue<T>
{
public abstract bool TryEnqueue(T element);
public abstract bool TryEnqueue(T item);
public abstract T Dequeue();
public abstract bool TryDequeue(out T item);
public abstract T Peek();
public abstract bool TryPeek(out T item);
public abstract int Count { get; }

Просмотреть файл

@ -0,0 +1,24 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Internal
{
using System.Collections.Concurrent;
public sealed class CompatibleConcurrentQueue<T> : ConcurrentQueue<T>, IQueue<T>
{
public bool TryEnqueue(T element)
{
this.Enqueue(element);
return true;
}
void IQueue<T>.Clear()
{
T item;
while (this.TryDequeue(out item))
{
}
}
}
}

Просмотреть файл

@ -3,8 +3,6 @@
namespace DotNetty.Common.Internal
{
using System;
using System.Threading;
using DotNetty.Common.Utilities;
/// Forked from
@ -27,7 +25,6 @@ namespace DotNetty.Common.Internal
abstract class ConcurrentCircularArrayQueue<T> : ConcurrentCircularArrayQueueL0Pad<T>
where T : class
{
protected static readonly int RefBufferPad = (64 * 2) / IntPtr.Size;
protected long Mask;
protected readonly T[] Buffer;
@ -36,68 +33,37 @@ namespace DotNetty.Common.Internal
int actualCapacity = IntegerExtensions.RoundUpToPowerOfTwo(capacity);
this.Mask = actualCapacity - 1;
// pad data on either end with some empty slots.
this.Buffer = new T[actualCapacity + RefBufferPad * 2];
this.Buffer = new T[actualCapacity + RefArrayAccessUtil.RefBufferPad * 2];
}
/// @param index desirable element index
/// @return the offset in bytes within the array for a given index.
protected long CalcElementOffset(long index) => CalcElementOffset(index, this.Mask);
/// @param index desirable element index
/// @param mask
/// @return the offset in bytes within the array for a given index.
protected static long CalcElementOffset(long index, long mask) => RefBufferPad + (index & mask);
protected long CalcElementOffset(long index) => RefArrayAccessUtil.CalcElementOffset(index, this.Mask);
/// A plain store (no ordering/fences) of an element to a given offset
/// @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
/// @param e a kitty
protected void SpElement(long offset, T e) => SpElement(this.Buffer, offset, e);
/// A plain store (no ordering/fences) of an element to a given offset
/// @param buffer this.buffer
/// @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
/// @param e an orderly kitty
protected static void SpElement(T[] buffer, long offset, T e)
{
buffer[offset] = e;
}
protected void SpElement(long offset, T e) => RefArrayAccessUtil.SpElement(this.Buffer, offset, e);
/// An ordered store(store + StoreStore barrier) of an element to a given offset
/// @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
/// @param e an orderly kitty
protected void SoElement(long offset, T e) => SoElement(this.Buffer, offset, e);
/// An ordered store(store + StoreStore barrier) of an element to a given offset
/// @param buffer this.buffer
/// @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
/// @param e an orderly kitty
protected static void SoElement(T[] buffer, long offset, T e) => Volatile.Write(ref buffer[offset], e);
protected void SoElement(long offset, T e) => RefArrayAccessUtil.SoElement(this.Buffer, offset, e);
/// A plain load (no ordering/fences) of an element from a given offset.
/// @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
/// @return the element at the offset
protected T LpElement(long offset) => LpElement(this.Buffer, offset);
/// A plain load (no ordering/fences) of an element from a given offset.
/// @param buffer this.buffer
/// @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
/// @return the element at the offset
protected static T LpElement(T[] buffer, long offset) => buffer[offset];
protected T LpElement(long offset) => RefArrayAccessUtil.LpElement(this.Buffer, offset);
/// A volatile load (load + LoadLoad barrier) of an element from a given offset.
/// @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
/// @return the element at the offset
protected T LvElement(long offset) => LvElement(this.Buffer, offset);
/// A volatile load (load + LoadLoad barrier) of an element from a given offset.
/// @param buffer this.buffer
/// @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
/// @return the element at the offset
protected static T LvElement(T[] buffer, long offset) => Volatile.Read(ref buffer[offset]);
protected T LvElement(long offset) => RefArrayAccessUtil.LvElement(this.Buffer, offset);
public override void Clear()
{
while (this.Dequeue() != null || !this.IsEmpty)
T item;
while (this.TryDequeue(out item) || !this.IsEmpty)
{
// looping
}

Просмотреть файл

@ -5,11 +5,11 @@ namespace DotNetty.Common.Internal
{
public interface IQueue<T>
{
bool TryEnqueue(T element);
bool TryEnqueue(T item);
T Dequeue();
bool TryDequeue(out T item);
T Peek();
bool TryPeek(out T item);
int Count { get; }

Просмотреть файл

@ -75,7 +75,7 @@ namespace DotNetty.Common.Internal
// the index visibility to poll() we would need to handle the case where the element is not visible.
// Won CAS, move on to storing
long offset = CalcElementOffset(currentProducerIndex, mask);
long offset = RefArrayAccessUtil.CalcElementOffset(currentProducerIndex, mask);
this.SoElement(offset, e); // StoreStore
return true; // AWESOME :)
}
@ -112,7 +112,7 @@ namespace DotNetty.Common.Internal
}
// Won CAS, move on to storing
long offset = CalcElementOffset(currentTail, mask);
long offset = RefArrayAccessUtil.CalcElementOffset(currentTail, mask);
this.SoElement(offset, e);
return 0; // AWESOME :)
}
@ -123,7 +123,7 @@ namespace DotNetty.Common.Internal
/// <br />
/// Lock free poll using ordered loads/stores. As class name suggests access is limited to a single thread.
/// @see java.util.Queue#poll()
public override T Dequeue()
public override bool TryDequeue(out T item)
{
long consumerIndex = this.ConsumerIndex; // LoadLoad
long offset = this.CalcElementOffset(consumerIndex);
@ -131,7 +131,7 @@ namespace DotNetty.Common.Internal
T[] buffer = this.Buffer;
// If we can't see the next available element we can't poll
T e = LvElement(buffer, offset); // LoadLoad
T e = RefArrayAccessUtil.LvElement(buffer, offset); // LoadLoad
if (null == e)
{
// NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
@ -142,19 +142,21 @@ namespace DotNetty.Common.Internal
{
do
{
e = LvElement(buffer, offset);
e = RefArrayAccessUtil.LvElement(buffer, offset);
}
while (e == null);
}
else
{
return default(T);
item = default(T);
return false;
}
}
SpElement(buffer, offset, default(T));
RefArrayAccessUtil.SpElement(buffer, offset, default(T));
this.ConsumerIndex = consumerIndex + 1; // StoreStore
return e;
item = e;
return true;
}
/// {@inheritDoc}
@ -163,14 +165,14 @@ namespace DotNetty.Common.Internal
/// <br />
/// Lock free peek using ordered loads. As class name suggests access is limited to a single thread.
/// @see java.util.Queue#poll()
public override T Peek()
public override bool TryPeek(out T item)
{
// Copy field to avoid re-reading after volatile load
T[] buffer = this.Buffer;
long consumerIndex = this.ConsumerIndex; // LoadLoad
long offset = this.CalcElementOffset(consumerIndex);
T e = LvElement(buffer, offset);
T e = RefArrayAccessUtil.LvElement(buffer, offset);
if (null == e)
{
// NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
@ -181,16 +183,18 @@ namespace DotNetty.Common.Internal
{
do
{
e = LvElement(buffer, offset);
e = RefArrayAccessUtil.LvElement(buffer, offset);
}
while (e == null);
}
else
{
return default(T);
item = default(T);
return false;
}
}
return e;
item = e;
return true;
}
/// {@inheritDoc}

Просмотреть файл

@ -1,21 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Internal
{
using DotNetty.Common.Concurrency;
using DotNetty.Common.Utilities;
/// <summary>
/// <see cref="IRunnable" /> which represent a one time task which may allow the <see cref="IEventExecutor" /> to
/// reduce the amount of
/// produced garbage when queue it for execution.
/// <strong>It is important this will not be reused. After submitted it is not allowed to get submitted again!</strong>
/// </summary>
public abstract class OneTimeTask : MpscLinkedQueueNode<IRunnable>, IRunnable
{
public override IRunnable Value => this;
public abstract void Run();
}
}

Просмотреть файл

@ -3,20 +3,10 @@
namespace DotNetty.Common.Internal
{
using DotNetty.Common.Utilities;
public static class PlatformDependent
{
public static IQueue<T> NewFixedMpscQueue<T>(int capacity)
where T : class
{
return new MpscArrayQueue<T>(capacity);
}
public static IQueue<T> NewFixedMpscQueue<T>(int capacity) where T : class => new MpscArrayQueue<T>(capacity);
public static IQueue<T> NewMpscQueue<T>()
where T : class
{
return new MpscLinkedQueue<T>();
}
public static IQueue<T> NewMpscQueue<T>() where T : class => new CompatibleConcurrentQueue<T>();
}
}

Просмотреть файл

@ -0,0 +1,39 @@
namespace DotNetty.Common.Internal
{
using System;
using System.Threading;
static class RefArrayAccessUtil
{
public static readonly int RefBufferPad = 64 * 2 / IntPtr.Size;
/// A plain store (no ordering/fences) of an element to a given offset
/// @param buffer this.buffer
/// @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
/// @param e an orderly kitty
public static void SpElement<T>(T[] buffer, long offset, T e) => buffer[offset] = e;
/// An ordered store(store + StoreStore barrier) of an element to a given offset
/// @param buffer this.buffer
/// @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
/// @param e an orderly kitty
public static void SoElement<T>(T[] buffer, long offset, T e) where T : class => Volatile.Write(ref buffer[offset], e);
/// A plain load (no ordering/fences) of an element from a given offset.
/// @param buffer this.buffer
/// @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
/// @return the element at the offset
public static T LpElement<T>(T[] buffer, long offset) => buffer[offset];
/// A volatile load (load + LoadLoad barrier) of an element from a given offset.
/// @param buffer this.buffer
/// @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
/// @return the element at the offset
public static T LvElement<T>(T[] buffer, long offset) where T : class => Volatile.Read(ref buffer[offset]);
/// @param index desirable element index
/// @param mask
/// @return the offset in bytes within the array for a given index.
public static long CalcElementOffset(long index, long mask) => RefBufferPad + (index & mask);
}
}

Просмотреть файл

@ -143,8 +143,8 @@ namespace DotNetty.Common
{
for (;;)
{
Entry e = PendingEntries.Dequeue();
if (e == null)
Entry e;
if (!PendingEntries.TryDequeue(out e))
{
break;
}
@ -186,7 +186,7 @@ namespace DotNetty.Common
}
}
sealed class Entry : MpscLinkedQueueNode<Entry>
sealed class Entry
{
internal readonly Thread Thread;
internal readonly Action Task;
@ -199,8 +199,6 @@ namespace DotNetty.Common
this.IsWatch = isWatch;
}
public override Entry Value => this;
public override int GetHashCode() => this.Thread.GetHashCode() ^ this.Task.GetHashCode();
public override bool Equals(object obj)

Просмотреть файл

@ -1,237 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Utilities
{
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading;
using DotNetty.Common.Internal;
sealed class MpscLinkedQueue<T> : MpscLinkedQueueTailRef<T>, IEnumerable<T>, IQueue<T>
where T : class
{
#pragma warning disable 169 // padded reference
long p00, p01, p02, p03, p04, p05, p06, p07;
long p30, p31, p32, p33, p34, p35, p36, p37;
#pragma warning restore 169
// offer() occurs at the tail of the linked list.
// poll() occurs at the head of the linked list.
//
// Resulting layout is:
//
// head --next--> 1st element --next--> 2nd element --next--> ... tail (last element)
//
// where the head is a dummy node whose value is null.
//
// offer() appends a new node next to the tail using AtomicReference.getAndSet()
// poll() removes head from the linked list and promotes the 1st element to the head,
// setting its value to null if possible.
public MpscLinkedQueue()
{
MpscLinkedQueueNode<T> tombstone = new DefaultNode(null);
this.HeadRef = tombstone;
this.TailRef = tombstone;
}
/// <summary>
/// Returns the node right next to the head, which contains the first element of this queue.
/// </summary>
MpscLinkedQueueNode<T> PeekNode()
{
MpscLinkedQueueNode<T> head = this.HeadRef;
MpscLinkedQueueNode<T> next = head.Next;
if (next == null && head != this.TailRef)
{
// if tail != head this is not going to change until consumer makes progress
// we can avoid reading the head and just spin on next until it shows up
//
// See https://github.com/akka/akka/pull/15596
do
{
next = head.Next;
}
while (next == null);
}
return next;
}
public bool TryEnqueue(T value)
{
Contract.Requires(value != null);
MpscLinkedQueueNode<T> newTail;
var node = value as MpscLinkedQueueNode<T>;
if (node != null)
{
newTail = node;
newTail.Next = null;
}
else
{
newTail = new DefaultNode(value);
}
MpscLinkedQueueNode<T> oldTail = this.GetAndSetTailRef(newTail);
oldTail.Next = newTail;
return true;
}
public T Dequeue()
{
MpscLinkedQueueNode<T> next = this.PeekNode();
if (next == null)
{
return null;
}
// next becomes a new head.
MpscLinkedQueueNode<T> oldHead = this.HeadRef;
// todo: research storestore vs loadstore barriers
// See: http://robsjava.blogspot.com/2013/06/a-faster-volatile.html
// See: http://psy-lob-saw.blogspot.com/2012/12/atomiclazyset-is-performance-win-for.html
this.HeadRef = next;
// Break the linkage between the old head and the new head.
oldHead.Unlink();
return next.ClearMaybe();
}
public T Peek()
{
MpscLinkedQueueNode<T> next = this.PeekNode();
return next?.Value;
}
public int Count
{
get
{
int count = 0;
MpscLinkedQueueNode<T> n = this.PeekNode();
while (true)
{
if (n == null)
{
break;
}
count ++;
n = n.Next;
}
return count;
}
}
public bool IsEmpty => this.PeekNode() == null;
public IEnumerator<T> GetEnumerator()
{
MpscLinkedQueueNode<T> node = this.PeekNode();
while (node != null)
{
yield return node.Value;
node = node.Next;
}
}
IEnumerator IEnumerable.GetEnumerator() => this.GetEnumerator();
public void Clear()
{
while (this.Dequeue() != null)
{
}
}
class DefaultNode : MpscLinkedQueueNode<T>
{
T value;
internal DefaultNode(T value)
{
this.value = value;
}
public override T Value => this.value;
protected internal override T ClearMaybe()
{
T v = this.value;
this.value = null;
return v;
}
}
}
abstract class MpscLinkedQueueHeadRef<T> : MpscLinkedQueuePad0
{
MpscLinkedQueueNode<T> headRef;
protected MpscLinkedQueueNode<T> HeadRef
{
get { return Volatile.Read(ref this.headRef); }
set { Volatile.Write(ref this.headRef, value); }
}
}
public abstract class MpscLinkedQueueNode<T>
{
volatile MpscLinkedQueueNode<T> next;
internal MpscLinkedQueueNode<T> Next
{
get { return this.next; }
set { this.next = value; }
}
public abstract T Value { get; }
/// <summary>
/// Sets the element this node contains to <code>null</code> so that the node can be used as a tombstone.
/// </summary>
/// <returns></returns>
protected internal virtual T ClearMaybe() => this.Value;
/// <summary>
/// Unlink to allow GC
/// </summary>
internal virtual void Unlink() => this.Next = null;
}
abstract class MpscLinkedQueuePad0
{
#pragma warning disable 169 // padded reference
long p00, p01, p02, p03, p04, p05, p06, p07;
long p30, p31, p32, p33, p34, p35, p36, p37;
#pragma warning restore 169
}
abstract class MpscLinkedQueuePad1<T> : MpscLinkedQueueHeadRef<T>
{
#pragma warning disable 169 // padded reference
long p00, p01, p02, p03, p04, p05, p06, p07;
long p30, p31, p32, p33, p34, p35, p36, p37;
#pragma warning restore 169
}
abstract class MpscLinkedQueueTailRef<T> : MpscLinkedQueuePad1<T>
{
MpscLinkedQueueNode<T> tailRef;
protected MpscLinkedQueueNode<T> TailRef
{
get { return Volatile.Read(ref this.tailRef); }
set { Volatile.Write(ref this.tailRef, value); }
}
protected MpscLinkedQueueNode<T> GetAndSetTailRef(MpscLinkedQueueNode<T> value)
{
#pragma warning disable 420
return Interlocked.Exchange(ref this.tailRef, value);
#pragma warning restore 420
}
}
}

Просмотреть файл

@ -1,24 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Utilities
{
using System.Diagnostics.Contracts;
public abstract class RecyclableMpscLinkedQueueNode<T> : MpscLinkedQueueNode<T>
{
readonly ThreadLocalPool.Handle handle;
protected RecyclableMpscLinkedQueueNode(ThreadLocalPool.Handle handle)
{
Contract.Requires(handle != null);
this.handle = handle;
}
internal override void Unlink()
{
base.Unlink();
this.handle.Release(this);
}
}
}

Просмотреть файл

@ -8,7 +8,6 @@ namespace DotNetty.Handlers.Timeout
using System.Threading.Tasks;
using DotNetty.Common.Utilities;
using DotNetty.Common.Concurrency;
using DotNetty.Common.Internal;
using DotNetty.Transport.Channels;
/// <summary>
@ -142,13 +141,13 @@ namespace DotNetty.Handlers.Timeout
}
}
sealed class WriteTimeoutTask : OneTimeTask
sealed class WriteTimeoutTask : IRunnable
{
readonly WriteTimeoutHandler handler;
readonly IChannelHandlerContext context;
readonly Task future;
readonly static Action<Task, object> OperationCompleteAction = HandleOperationComplete;
static readonly Action<Task, object> OperationCompleteAction = HandleOperationComplete;
public WriteTimeoutTask(IChannelHandlerContext context, Task future, WriteTimeoutHandler handler)
{
@ -169,9 +168,7 @@ namespace DotNetty.Handlers.Timeout
public IScheduledTask ScheduledTask { get; set; }
public Task Future => this.future;
public override void Run()
public void Run()
{
if (!this.future.IsCompleted)
{
@ -181,7 +178,7 @@ namespace DotNetty.Handlers.Timeout
}
catch (Exception ex)
{
context.FireExceptionCaught(ex);
this.context.FireExceptionCaught(ex);
}
}

Просмотреть файл

@ -976,7 +976,7 @@ namespace DotNetty.Transport.Channels
public override string ToString() => $"{typeof(IChannelHandlerContext).Name} ({this.Name}, {this.Channel})";
abstract class AbstractWriteTask : RecyclableMpscLinkedQueueNode<IRunnable>, IRunnable
abstract class AbstractWriteTask : IRunnable
{
static readonly bool EstimateTaskSizeOnSubmit =
SystemPropertyUtil.GetBoolean("io.netty.transport.estimateSizeOnSubmit", true);
@ -985,6 +985,7 @@ namespace DotNetty.Transport.Channels
static readonly int WriteTaskOverhead =
SystemPropertyUtil.GetInt("io.netty.transport.writeTaskSizeOverhead", 56);
ThreadLocalPool.Handle handle;
AbstractChannelHandlerContext ctx;
object msg;
TaskCompletionSource promise;
@ -1018,8 +1019,8 @@ namespace DotNetty.Transport.Channels
}
protected AbstractWriteTask(ThreadLocalPool.Handle handle)
: base(handle)
{
this.handle = handle;
}
public void Run()
@ -1040,11 +1041,10 @@ namespace DotNetty.Transport.Channels
this.ctx = null;
this.msg = null;
this.promise = null;
this.handle.Release(this);
}
}
public override IRunnable Value => this;
protected virtual Task WriteAsync(AbstractChannelHandlerContext ctx, object msg) => ctx.InvokeWriteAsync(msg);
}
sealed class WriteTask : AbstractWriteTask {

Просмотреть файл

@ -14,7 +14,6 @@ namespace DotNetty.Transport.Channels
using System.Threading.Tasks;
using DotNetty.Common;
using DotNetty.Common.Concurrency;
using DotNetty.Common.Internal;
using DotNetty.Common.Internal.Logging;
using DotNetty.Common.Utilities;
@ -1182,7 +1181,7 @@ namespace DotNetty.Transport.Channels
public void ChannelWritabilityChanged(IChannelHandlerContext context) => context.FireChannelWritabilityChanged();
}
abstract class PendingHandlerCallback : OneTimeTask
abstract class PendingHandlerCallback : IRunnable
{
protected readonly DefaultChannelPipeline Pipeline;
protected readonly AbstractChannelHandlerContext Ctx;
@ -1194,6 +1193,8 @@ namespace DotNetty.Transport.Channels
this.Ctx = ctx;
}
public abstract void Run();
internal abstract void Execute();
}

Просмотреть файл

@ -6,6 +6,7 @@ namespace DotNetty.Transport.Channels
using System;
using System.Threading.Tasks;
using DotNetty.Common.Concurrency;
using DotNetty.Common.Internal;
public class SingleThreadEventLoop : SingleThreadEventExecutor, IEventLoop
{
@ -26,6 +27,11 @@ namespace DotNetty.Transport.Channels
{
}
protected SingleThreadEventLoop(string threadName, TimeSpan breakoutInterval, IQueue<IRunnable> taskQueue)
: base(threadName, breakoutInterval, taskQueue)
{
}
public Task RegisterAsync(IChannel channel) => channel.Unsafe.RegisterAsync(this);
}
}

Просмотреть файл

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<runtime>
<gcServer enabled="true" />
</runtime>
</configuration>

Просмотреть файл

@ -7,12 +7,14 @@ namespace DotNetty.Microbench
using System.Collections.Generic;
using System.Threading;
using DotNetty.Common.Concurrency;
using DotNetty.Common.Internal;
using DotNetty.Microbench.Utilities;
using Xunit;
using Xunit.Abstractions;
public class BenchTests
{
const int Iterations = 10 * 1000 * 1000;
readonly ITestOutputHelper output;
public BenchTests(ITestOutputHelper output)
@ -23,35 +25,19 @@ namespace DotNetty.Microbench
[Fact]
public void BenchSingleThreadEventExecutorWaiting()
{
const int Iterations = 10 * 1000 * 1000;
var testSubjects = new Dictionary<string, IEventExecutor>
{
{ "MRES", new SingleThreadEventExecutor("MRES", TimeSpan.FromSeconds(1)) },
//{ "Semaphore", new SingleThreadEventExecutorOld("Semaphore", TimeSpan.FromSeconds(1)) }
{ "CompatibleConcurrentQueue", new TestExecutor("ConcurrentQueueCustom", TimeSpan.FromSeconds(1), new CompatibleConcurrentQueue<IRunnable>()) },
{ "ArrayQueue", new TestExecutor("ArrayQueue", TimeSpan.FromSeconds(1), PlatformDependent.NewFixedMpscQueue<IRunnable>(1 * 1000 * 1000)) }
};
var mre = new ManualResetEvent(false);
Action<object, object> action = null;
action = (s, i) =>
{
var container = (Container<int>)i;
if (container.Value < Iterations)
{
container.Value++;
((IEventExecutor)s).Execute(action, s, container);
}
else
{
mre.Set();
}
};
CodeTimer.Benchmark(testSubjects, "STEE in loop ({0})", 1, this.output,
scheduler =>
{
scheduler.Execute(action, scheduler, new Container<int>());
var action = new BenchActionIn(scheduler, mre);
scheduler.Execute(action);
if (!mre.WaitOne(TimeSpan.FromMinutes(1)))
{
@ -60,22 +46,13 @@ namespace DotNetty.Microbench
mre.Reset();
});
Action<object> execFromOutsideAction = i =>
{
var container = (Container<int>)i;
if (++container.Value >= Iterations)
{
mre.Set();
}
};
CodeTimer.Benchmark(testSubjects, "STEE out of loop ({0})", 1, this.output,
scheduler =>
{
var container = new Container<int>();
var action = new BenchActionOut(mre);
for (int i = 0; i < Iterations; i++)
{
scheduler.Execute(execFromOutsideAction, container);
scheduler.Execute(action);
}
if (!mre.WaitOne(TimeSpan.FromMinutes(1)))
@ -91,6 +68,58 @@ namespace DotNetty.Microbench
}
}
sealed class TestExecutor : SingleThreadEventExecutor
{
public TestExecutor(string threadName, TimeSpan breakoutInterval, IQueue<IRunnable> queue)
: base(threadName, breakoutInterval, queue)
{
}
}
sealed class BenchActionIn : IRunnable
{
int value;
readonly IEventExecutor executor;
readonly ManualResetEvent evt;
public BenchActionIn(IEventExecutor executor, ManualResetEvent evt)
{
this.executor = executor;
this.evt = evt;
}
public void Run()
{
if (++this.value < Iterations)
{
this.executor.Execute(this);
}
else
{
this.evt.Set();
}
}
}
sealed class BenchActionOut : IRunnable
{
int value;
readonly ManualResetEvent evt;
public BenchActionOut(ManualResetEvent evt)
{
this.evt = evt;
}
public void Run()
{
if (++this.value >= Iterations)
{
this.evt.Set();
}
}
}
public class Container<T>
{
public T Value;

Просмотреть файл

@ -73,6 +73,7 @@
<Compile Include="Utilities\CycleTime.cs" />
</ItemGroup>
<ItemGroup>
<None Include="App.config" />
<None Include="packages.config" />
</ItemGroup>
<ItemGroup>