This commit is contained in:
Max Gortman 2021-07-16 18:26:30 -07:00 коммит произвёл Max Gortman
Родитель 1cdaadbfaa
Коммит 05e5aa40c3
15 изменённых файлов: 14 добавлений и 131 удалений

Просмотреть файл

@ -6,10 +6,10 @@ namespace DotNetty.Buffers
using System;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Threading;
using DotNetty.Common;
using DotNetty.Common.Internal;
using DotNetty.Common.Internal.Logging;
using Thread = DotNetty.Common.Concurrency.XThread;
/// <summary>
/// Acts a Thread cache for allocations. This implementation is moduled after

Просмотреть файл

@ -8,7 +8,6 @@ namespace DotNetty.Common.Concurrency
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common.Internal.Logging;
using Thread = XThread;
/// <summary>
/// Abstract base class for <see cref="IEventExecutor" /> implementations

Просмотреть файл

@ -3,8 +3,8 @@
namespace DotNetty.Common.Concurrency
{
using Thread = DotNetty.Common.Concurrency.XThread;
using System.Threading;
public interface IEventExecutor : IEventExecutorGroup
{
/// <summary>

Просмотреть файл

@ -12,7 +12,6 @@ namespace DotNetty.Common.Concurrency
using System.Threading.Tasks;
using DotNetty.Common.Internal;
using DotNetty.Common.Internal.Logging;
using Thread = XThread;
/// <summary>
/// <see cref="IEventExecutor"/> backed by a single thread.

Просмотреть файл

@ -1,107 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
{
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
public delegate void XParameterizedThreadStart(object obj);
[DebuggerDisplay("ID={threadId}, Name={Name}, IsExplicit={isExplicit}")]
public sealed class XThread
{
static int maxThreadId;
[ThreadStatic]
static XThread currentThread;
readonly int threadId;
#pragma warning disable CS0414
readonly bool isExplicit; // For debugging only
#pragma warning restore CS0414
Task task;
readonly EventWaitHandle completed = new EventWaitHandle(false, EventResetMode.AutoReset);
readonly EventWaitHandle readyToStart = new EventWaitHandle(false, EventResetMode.AutoReset);
object startupParameter;
static int GetNewThreadId() => Interlocked.Increment(ref maxThreadId);
XThread()
{
this.threadId = GetNewThreadId();
this.isExplicit = false;
this.IsAlive = false;
}
public XThread(Action action)
{
this.threadId = GetNewThreadId();
this.isExplicit = true;
this.IsAlive = false;
this.CreateLongRunningTask(x => action());
}
public XThread(XParameterizedThreadStart threadStartFunc)
{
this.threadId = GetNewThreadId();
this.isExplicit = true;
this.IsAlive = false;
this.CreateLongRunningTask(threadStartFunc);
}
public void Start()
{
this.readyToStart.Set();
this.IsAlive = true;
}
void CreateLongRunningTask(XParameterizedThreadStart threadStartFunc)
{
this.task = Task.Factory.StartNew(
() =>
{
// We start the task running, then unleash it by signaling the readyToStart event.
// This is needed to avoid thread reuse for tasks (see below)
this.readyToStart.WaitOne();
// This is the first time we're using this thread, therefore the TLS slot must be empty
if (currentThread != null)
{
Debug.WriteLine("warning: currentThread already created; OS thread reused");
Debug.Assert(false);
}
currentThread = this;
threadStartFunc(this.startupParameter);
this.completed.Set();
},
CancellationToken.None,
// .NET always creates a brand new thread for LongRunning tasks
// This is not documented but unlikely to ever change:
// https://github.com/dotnet/corefx/issues/2576#issuecomment-126693306
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
public void Start(object parameter)
{
this.startupParameter = parameter;
this.Start();
}
public static void Sleep(int millisecondsTimeout) => Task.Delay(millisecondsTimeout).Wait();
public int Id => this.threadId;
public string Name { get; set; }
public bool IsAlive { get; private set; }
public static XThread CurrentThread => currentThread ?? (currentThread = new XThread());
public bool Join(TimeSpan timeout) => this.completed.WaitOne(timeout);
public bool Join(int millisecondsTimeout) => this.completed.WaitOne(millisecondsTimeout);
}
}

Просмотреть файл

@ -10,7 +10,6 @@ namespace DotNetty.Common
using DotNetty.Common.Concurrency;
using DotNetty.Common.Internal;
using DotNetty.Common.Internal.Logging;
using Thread = DotNetty.Common.Concurrency.XThread;
public static class ThreadDeathWatcher
{

Просмотреть файл

@ -10,7 +10,6 @@ namespace DotNetty.Common
using System.Diagnostics.Contracts;
using System.Runtime.CompilerServices;
using System.Threading;
using Thread = DotNetty.Common.Concurrency.XThread;
public class ThreadLocalPool
{

Просмотреть файл

@ -27,7 +27,7 @@ namespace DotNetty.Common.Utilities
const int InstanceCountLimit = 64;
readonly Worker worker;
readonly XThread workerThread;
readonly Thread workerThread;
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
const int WorkerStateInit = 0;
@ -98,7 +98,7 @@ namespace DotNetty.Common.Utilities
tickInterval,
long.MaxValue / this.wheel.Length));
}
this.workerThread = new XThread(st => this.worker.Run());
this.workerThread = new Thread(st => this.worker.Run());
this.maxPendingTimeouts = maxPendingTimeouts;
@ -187,7 +187,7 @@ namespace DotNetty.Common.Utilities
{
GC.SuppressFinalize(this);
if (XThread.CurrentThread == this.workerThread)
if (Thread.CurrentThread == this.workerThread)
{
throw new InvalidOperationException($"{nameof(HashedWheelTimer)}.stop() cannot be called from timer task.");
}

Просмотреть файл

@ -6,7 +6,6 @@ namespace DotNetty.Common.Utilities
using System;
using System.Threading;
using DotNetty.Common.Internal.Logging;
using Thread = DotNetty.Common.Concurrency.XThread;
public static class ReferenceCountUtil
{

Просмотреть файл

@ -6,7 +6,6 @@ namespace DotNetty.Common.Utilities
using System;
using System.Diagnostics.Contracts;
using System.Threading;
using Thread = DotNetty.Common.Concurrency.XThread;
public static class ThreadExtensions
{

Просмотреть файл

@ -74,7 +74,7 @@ namespace DotNetty.Transport.Libuv
public override IEventExecutor GetNext()
{
// Attempt to select event loop based on thread first
int threadId = XThread.CurrentThread.Id;
int threadId = Thread.CurrentThread.ManagedThreadId;
int i;
for (i = 0; i < this.eventLoops.Length; i++)
{

Просмотреть файл

@ -36,7 +36,7 @@ namespace DotNetty.Transport.Libuv
readonly ThreadLocalPool<WriteRequest> writeRequestPool = new ThreadLocalPool<WriteRequest>(handle => new WriteRequest(handle));
readonly long preciseBreakoutInterval;
readonly IQueue<IRunnable> taskQueue;
readonly XThread thread;
readonly Thread thread;
readonly TaskScheduler scheduler;
readonly ManualResetEventSlim loopRunStart;
readonly TaskCompletionSource terminationCompletionSource;
@ -80,7 +80,7 @@ namespace DotNetty.Transport.Libuv
{
name = $"{name}({threadName})";
}
this.thread = new XThread(Run) { Name = name };
this.thread = new Thread(Run) { Name = name };
this.loopRunStart = new ManualResetEventSlim(false, 1);
}
@ -97,7 +97,7 @@ namespace DotNetty.Transport.Libuv
internal Loop UnsafeLoop => this.loop;
internal int LoopThreadId => this.thread.Id;
internal int LoopThreadId => this.thread.ManagedThreadId;
static void Run(object state)
{
@ -424,7 +424,7 @@ namespace DotNetty.Transport.Libuv
public override bool IsTerminated => this.executionState == TerminatedState;
public override bool IsInEventLoop(XThread t) => this.thread == t;
public override bool IsInEventLoop(Thread t) => this.thread == t;
void WakeUp(bool inEventLoop)
{

Просмотреть файл

@ -16,7 +16,6 @@ namespace DotNetty.Transport.Channels
using DotNetty.Common.Concurrency;
using DotNetty.Common.Internal.Logging;
using DotNetty.Common.Utilities;
using Thread = DotNetty.Common.Concurrency.XThread;
public class DefaultChannelPipeline : IChannelPipeline
{

Просмотреть файл

@ -5,10 +5,10 @@ namespace DotNetty.Transport.Channels.Embedded
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common;
using DotNetty.Common.Concurrency;
using Thread = DotNetty.Common.Concurrency.XThread;
sealed class EmbeddedEventLoop : AbstractScheduledEventExecutor, IEventLoop
{

Просмотреть файл

@ -23,14 +23,13 @@ namespace DotNetty.Common.Tests
ThreadLocalPool<HandledObject> pool = NewPool(1024);
HandledObject reference = null;
WeakReference<Thread> threadRef = null;
WeakReference<XThread> xThreadRef = null;
var thread1 = new Thread(() =>
{
//Don't know the reason, but thread2 will not be collected without wrapped with thread1
var thread2 = new Thread(() =>
{
Volatile.Write(ref xThreadRef, new WeakReference<XThread>(XThread.CurrentThread));
Volatile.Write(ref threadRef, new WeakReference<Thread>(Thread.CurrentThread));
HandledObject data = pool.Take();
// Store a reference to the HandledObject to ensure it is not collected when the run method finish.
Volatile.Write(ref reference, data);
@ -39,7 +38,6 @@ namespace DotNetty.Common.Tests
thread2.Start();
thread2.Join();
Assert.True(Volatile.Read(ref threadRef)?.TryGetTarget(out _));
Assert.True(Volatile.Read(ref xThreadRef)?.TryGetTarget(out _));
GC.KeepAlive(thread2);
// Null out so it can be collected.
@ -53,12 +51,11 @@ namespace DotNetty.Common.Tests
GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);
GC.WaitForPendingFinalizers();
if (Volatile.Read(ref threadRef)?.TryGetTarget(out _) == true || Volatile.Read(ref xThreadRef)?.TryGetTarget(out _) == true)
if (Volatile.Read(ref threadRef)?.TryGetTarget(out _) == true || Volatile.Read(ref threadRef)?.TryGetTarget(out _) == true)
Thread.Sleep(100);
}
Assert.False(Volatile.Read(ref threadRef)?.TryGetTarget(out _));
Assert.False(Volatile.Read(ref xThreadRef)?.TryGetTarget(out _));
// Now call recycle after the Thread was collected to ensure this still works...
reference.Release();