align with java executor services (#402)

This commit is contained in:
Onat Yiğit Mercan 2018-08-14 18:46:18 +01:00 коммит произвёл Max Gortman
Родитель 797629ac1b
Коммит a307801e84
20 изменённых файлов: 653 добавлений и 549 удалений

Просмотреть файл

@ -12,7 +12,7 @@ namespace DotNetty.Common.Concurrency
/// <summary>
/// Abstract base class for <see cref="IEventExecutor" /> implementations
/// </summary>
public abstract class AbstractEventExecutor : IEventExecutor
public abstract class AbstractEventExecutor : AbstractExecutorService, IEventExecutor
{
static readonly IInternalLogger Logger = InternalLoggerFactory.GetInstance<AbstractEventExecutor>();
@ -31,127 +31,87 @@ namespace DotNetty.Common.Concurrency
this.Parent = parent;
}
/// <inheritdoc cref="IEventExecutor"/>
public bool InEventLoop => this.IsInEventLoop(Thread.CurrentThread);
/// <inheritdoc cref="IEventExecutor"/>
/// <inheritdoc cref="IEventExecutorGroup"/>
public abstract bool IsShuttingDown { get; }
/// <inheritdoc cref="IEventExecutor"/>
/// <inheritdoc cref="IEventExecutorGroup"/>
public abstract Task TerminationCompletion { get; }
/// <inheritdoc cref="IEventExecutor"/>
public abstract bool IsShutdown { get; }
/// <inheritdoc cref="IEventExecutor"/>
public abstract bool IsTerminated { get; }
/// <inheritdoc cref="IEventExecutorGroup"/>
public IEventExecutor GetNext() => this;
/// <inheritdoc cref="IEventExecutor"/>
public IEventExecutorGroup Parent { get; }
/// <inheritdoc cref="IEventExecutor"/>
public bool InEventLoop => this.IsInEventLoop(Thread.CurrentThread);
/// <inheritdoc cref="IEventExecutor"/>
public abstract bool IsInEventLoop(Thread thread);
/// <inheritdoc cref="IEventExecutor"/>
public abstract void Execute(IRunnable task);
/// <inheritdoc cref="IEventExecutor"/>
public void Execute(Action<object> action, object state) => this.Execute(new StateActionTaskQueueNode(action, state));
/// <inheritdoc cref="IEventExecutor"/>
public void Execute(Action<object, object> action, object context, object state) => this.Execute(new StateActionWithContextTaskQueueNode(action, context, state));
/// <inheritdoc cref="IEventExecutor"/>
public void Execute(Action action) => this.Execute(new ActionTaskQueueNode(action));
/// <inheritdoc cref="IEventExecutor"/>
/// <inheritdoc cref="IScheduledExecutorService"/>
public virtual IScheduledTask Schedule(IRunnable action, TimeSpan delay)
{
throw new NotSupportedException();
}
/// <inheritdoc cref="IEventExecutor"/>
/// <inheritdoc cref="IScheduledExecutorService"/>
public virtual IScheduledTask Schedule(Action action, TimeSpan delay)
{
throw new NotSupportedException();
}
/// <inheritdoc cref="IEventExecutor"/>
/// <inheritdoc cref="IScheduledExecutorService"/>
public virtual IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay)
{
throw new NotSupportedException();
}
/// <inheritdoc cref="IEventExecutor"/>
/// <inheritdoc cref="IScheduledExecutorService"/>
public virtual IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay)
{
throw new NotSupportedException();
}
/// <inheritdoc cref="IEventExecutor"/>
public virtual Task ScheduleAsync(Action action, TimeSpan delay) => this.ScheduleAsync(action, delay, CancellationToken.None);
/// <inheritdoc cref="IScheduledExecutorService"/>
public virtual Task ScheduleAsync(Action action, TimeSpan delay) =>
this.ScheduleAsync(action, delay, CancellationToken.None);
/// <inheritdoc cref="IEventExecutor"/>
/// <inheritdoc cref="IScheduledExecutorService"/>
public virtual Task ScheduleAsync(Action<object> action, object state, TimeSpan delay, CancellationToken cancellationToken)
{
throw new NotSupportedException();
}
/// <inheritdoc cref="IEventExecutor"/>
public virtual Task ScheduleAsync(Action<object> action, object state, TimeSpan delay) => this.ScheduleAsync(action, state, delay, CancellationToken.None);
/// <inheritdoc cref="IScheduledExecutorService"/>
public virtual Task ScheduleAsync(Action<object> action, object state, TimeSpan delay) =>
this.ScheduleAsync(action, state, delay, CancellationToken.None);
/// <inheritdoc cref="IEventExecutor"/>
/// <inheritdoc cref="IScheduledExecutorService"/>
public virtual Task ScheduleAsync(Action action, TimeSpan delay, CancellationToken cancellationToken)
{
throw new NotSupportedException();
}
/// <inheritdoc cref="IEventExecutor"/>
public virtual Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay) => this.ScheduleAsync(action, context, state, delay, CancellationToken.None);
/// <inheritdoc cref="IScheduledExecutorService"/>
public virtual Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay) =>
this.ScheduleAsync(action, context, state, delay, CancellationToken.None);
/// <inheritdoc cref="IEventExecutor"/>
public virtual Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay, CancellationToken cancellationToken)
/// <inheritdoc cref="IScheduledExecutorService"/>
public virtual Task ScheduleAsync(
Action<object, object> action,
object context,
object state,
TimeSpan delay,
CancellationToken cancellationToken)
{
throw new NotSupportedException();
}
/// <inheritdoc cref="IEventExecutor"/>
public Task<T> SubmitAsync<T>(Func<T> func) => this.SubmitAsync(func, CancellationToken.None);
/// <inheritdoc cref="IEventExecutor"/>
public Task<T> SubmitAsync<T>(Func<T> func, CancellationToken cancellationToken)
{
var node = new FuncSubmitQueueNode<T>(func, cancellationToken);
this.Execute(node);
return node.Completion;
}
/// <inheritdoc cref="IEventExecutor"/>
public Task<T> SubmitAsync<T>(Func<object, T> func, object state) => this.SubmitAsync(func, state, CancellationToken.None);
/// <inheritdoc cref="IEventExecutor"/>
public Task<T> SubmitAsync<T>(Func<object, T> func, object state, CancellationToken cancellationToken)
{
var node = new StateFuncSubmitQueueNode<T>(func, state, cancellationToken);
this.Execute(node);
return node.Completion;
}
/// <inheritdoc cref="IEventExecutor"/>
public Task<T> SubmitAsync<T>(Func<object, object, T> func, object context, object state) => this.SubmitAsync(func, context, state, CancellationToken.None);
/// <inheritdoc cref="IEventExecutor"/>
public Task<T> SubmitAsync<T>(Func<object, object, T> func, object context, object state, CancellationToken cancellationToken)
{
var node = new StateFuncWithContextSubmitQueueNode<T>(func, context, state, cancellationToken);
this.Execute(node);
return node.Completion;
}
/// <inheritdoc cref="IEventExecutor"/>
/// <inheritdoc cref="IScheduledExecutorService"/>
public Task ShutdownGracefullyAsync() => this.ShutdownGracefullyAsync(DefaultShutdownQuietPeriod, DefaultShutdownTimeout);
/// <inheritdoc cref="IEventExecutor"/>
/// <inheritdoc cref="IScheduledExecutorService"/>
public abstract Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout);
/// <inheritdoc cref="IEventExecutor"/>
@ -168,128 +128,5 @@ namespace DotNetty.Common.Concurrency
Logger.Warn("A task raised an exception. Task: {}", task, ex);
}
}
#region Queuing data structures
sealed class ActionTaskQueueNode : IRunnable
{
readonly Action action;
public ActionTaskQueueNode(Action action)
{
this.action = action;
}
public void Run() => this.action();
}
sealed class StateActionTaskQueueNode : IRunnable
{
readonly Action<object> action;
readonly object state;
public StateActionTaskQueueNode(Action<object> action, object state)
{
this.action = action;
this.state = state;
}
public void Run() => this.action(this.state);
}
sealed class StateActionWithContextTaskQueueNode : IRunnable
{
readonly Action<object, object> action;
readonly object context;
readonly object state;
public StateActionWithContextTaskQueueNode(Action<object, object> action, object context, object state)
{
this.action = action;
this.context = context;
this.state = state;
}
public void Run() => this.action(this.context, this.state);
}
abstract class FuncQueueNodeBase<T> : IRunnable
{
readonly TaskCompletionSource<T> promise;
readonly CancellationToken cancellationToken;
protected FuncQueueNodeBase(TaskCompletionSource<T> promise, CancellationToken cancellationToken)
{
this.promise = promise;
this.cancellationToken = cancellationToken;
}
public Task<T> Completion => this.promise.Task;
public void Run()
{
if (this.cancellationToken.IsCancellationRequested)
{
this.promise.TrySetCanceled();
return;
}
try
{
T result = this.Call();
this.promise.TrySetResult(result);
}
catch (Exception ex)
{
// todo: handle fatal
this.promise.TrySetException(ex);
}
}
protected abstract T Call();
}
sealed class FuncSubmitQueueNode<T> : FuncQueueNodeBase<T>
{
readonly Func<T> func;
public FuncSubmitQueueNode(Func<T> func, CancellationToken cancellationToken)
: base(new TaskCompletionSource<T>(), cancellationToken)
{
this.func = func;
}
protected override T Call() => this.func();
}
sealed class StateFuncSubmitQueueNode<T> : FuncQueueNodeBase<T>
{
readonly Func<object, T> func;
public StateFuncSubmitQueueNode(Func<object, T> func, object state, CancellationToken cancellationToken)
: base(new TaskCompletionSource<T>(state), cancellationToken)
{
this.func = func;
}
protected override T Call() => this.func(this.Completion.AsyncState);
}
sealed class StateFuncWithContextSubmitQueueNode<T> : FuncQueueNodeBase<T>
{
readonly Func<object, object, T> func;
readonly object context;
public StateFuncWithContextSubmitQueueNode(Func<object, object, T> func, object context, object state, CancellationToken cancellationToken)
: base(new TaskCompletionSource<T>(state), cancellationToken)
{
this.func = func;
this.context = context;
}
protected override T Call() => this.func(this.context, this.Completion.AsyncState);
}
#endregion
}
}

Просмотреть файл

@ -0,0 +1,69 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
{
using System;
using System.Threading;
using System.Threading.Tasks;
public abstract class AbstractEventExecutorGroup : IEventExecutorGroup
{
static readonly TimeSpan DefaultShutdownQuietPeriod = TimeSpan.FromSeconds(2);
static readonly TimeSpan DefaultShutdownTimeout = TimeSpan.FromSeconds(15);
public abstract bool IsShutdown { get; }
public abstract bool IsTerminated { get; }
public abstract bool IsShuttingDown { get; }
public abstract Task TerminationCompletion { get; }
public abstract IEventExecutor GetNext();
public void Execute(IRunnable task) => this.GetNext().Execute(task);
public void Execute(Action<object> action, object state) => this.GetNext().Execute(action, state);
public void Execute(Action action) => this.GetNext().Execute(action);
public void Execute(Action<object, object> action, object context, object state) => this.GetNext().Execute(action, context, state);
public Task<T> SubmitAsync<T>(Func<T> func) => this.GetNext().SubmitAsync(func);
public Task<T> SubmitAsync<T>(Func<T> func, CancellationToken cancellationToken) => this.GetNext().SubmitAsync(func, cancellationToken);
public Task<T> SubmitAsync<T>(Func<object, T> func, object state) => GetNext().SubmitAsync(func, state);
public Task<T> SubmitAsync<T>(Func<object, T> func, object state, CancellationToken cancellationToken) => this.GetNext().SubmitAsync(func, state, cancellationToken);
public Task<T> SubmitAsync<T>(Func<object, object, T> func, object context, object state) => this.GetNext().SubmitAsync(func, context, state);
public Task<T> SubmitAsync<T>(Func<object, object, T> func, object context, object state, CancellationToken cancellationToken) => this.GetNext().SubmitAsync(func, context, cancellationToken);
public IScheduledTask Schedule(IRunnable action, TimeSpan delay) => this.GetNext().Schedule(action, delay);
public IScheduledTask Schedule(Action action, TimeSpan delay) => this.GetNext().Schedule(action, delay);
public IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay) => this.GetNext().Schedule(action, state, delay);
public IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay) => this.GetNext().Schedule(action, context, state, delay);
public Task ScheduleAsync(Action<object> action, object state, TimeSpan delay, CancellationToken cancellationToken) => this.GetNext().ScheduleAsync(action, state, delay, cancellationToken);
public Task ScheduleAsync(Action<object> action, object state, TimeSpan delay) => this.GetNext().ScheduleAsync(action, state, delay);
public Task ScheduleAsync(Action action, TimeSpan delay, CancellationToken cancellationToken) => this.GetNext().ScheduleAsync(action, delay, cancellationToken);
public Task ScheduleAsync(Action action, TimeSpan delay) => this.GetNext().ScheduleAsync(action, delay);
public Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay) => this.GetNext().ScheduleAsync(action, context, state, delay);
public Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay, CancellationToken cancellationToken) => this.GetNext().ScheduleAsync(action, context, state, delay);
public Task ShutdownGracefullyAsync() => this.ShutdownGracefullyAsync(DefaultShutdownQuietPeriod, DefaultShutdownTimeout);
public abstract Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout);
}
}

Просмотреть файл

@ -0,0 +1,195 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
{
using System;
using System.Threading;
using System.Threading.Tasks;
public abstract class AbstractExecutorService : IExecutorService
{
/// <inheritdoc cref="IExecutorService"/>
public abstract bool IsShutdown { get; }
/// <inheritdoc cref="IExecutorService"/>
public abstract bool IsTerminated { get; }
/// <inheritdoc cref="IExecutorService"/>
public Task<T> SubmitAsync<T>(Func<T> func) => this.SubmitAsync(func, CancellationToken.None);
/// <inheritdoc cref="IExecutorService"/>
public Task<T> SubmitAsync<T>(Func<T> func, CancellationToken cancellationToken)
{
var node = new FuncSubmitQueueNode<T>(func, cancellationToken);
this.Execute(node);
return node.Completion;
}
/// <inheritdoc cref="IExecutorService"/>
public Task<T> SubmitAsync<T>(Func<object, T> func, object state) => this.SubmitAsync(func, state, CancellationToken.None);
/// <inheritdoc cref="IExecutorService"/>
public Task<T> SubmitAsync<T>(Func<object, T> func, object state, CancellationToken cancellationToken)
{
var node = new StateFuncSubmitQueueNode<T>(func, state, cancellationToken);
this.Execute(node);
return node.Completion;
}
/// <inheritdoc cref="IExecutorService"/>
public Task<T> SubmitAsync<T>(Func<object, object, T> func, object context, object state) =>
this.SubmitAsync(func, context, state, CancellationToken.None);
/// <inheritdoc cref="IExecutorService"/>
public Task<T> SubmitAsync<T>(
Func<object, object, T> func,
object context,
object state,
CancellationToken cancellationToken)
{
var node = new StateFuncWithContextSubmitQueueNode<T>(func, context, state, cancellationToken);
this.Execute(node);
return node.Completion;
}
/// <inheritdoc cref="IExecutor"/>
public abstract void Execute(IRunnable task);
/// <inheritdoc cref="IExecutor"/>
public void Execute(Action<object> action, object state) => this.Execute(new StateActionTaskQueueNode(action, state));
/// <inheritdoc cref="IExecutor"/>
public void Execute(Action<object, object> action, object context, object state) => this.Execute(new StateActionWithContextTaskQueueNode(action, context, state));
/// <inheritdoc cref="IExecutor"/>
public void Execute(Action action) => this.Execute(new ActionTaskQueueNode(action));
#region Queuing data structures
sealed class ActionTaskQueueNode : IRunnable
{
readonly Action action;
public ActionTaskQueueNode(Action action)
{
this.action = action;
}
public void Run() => this.action();
}
sealed class StateActionTaskQueueNode : IRunnable
{
readonly Action<object> action;
readonly object state;
public StateActionTaskQueueNode(Action<object> action, object state)
{
this.action = action;
this.state = state;
}
public void Run() => this.action(this.state);
}
sealed class StateActionWithContextTaskQueueNode : IRunnable
{
readonly Action<object, object> action;
readonly object context;
readonly object state;
public StateActionWithContextTaskQueueNode(Action<object, object> action, object context, object state)
{
this.action = action;
this.context = context;
this.state = state;
}
public void Run() => this.action(this.context, this.state);
}
abstract class FuncQueueNodeBase<T> : IRunnable
{
readonly TaskCompletionSource<T> promise;
readonly CancellationToken cancellationToken;
protected FuncQueueNodeBase(TaskCompletionSource<T> promise, CancellationToken cancellationToken)
{
this.promise = promise;
this.cancellationToken = cancellationToken;
}
public Task<T> Completion => this.promise.Task;
public void Run()
{
if (this.cancellationToken.IsCancellationRequested)
{
this.promise.TrySetCanceled();
return;
}
try
{
T result = this.Call();
this.promise.TrySetResult(result);
}
catch (Exception ex)
{
// todo: handle fatal
this.promise.TrySetException(ex);
}
}
protected abstract T Call();
}
sealed class FuncSubmitQueueNode<T> : FuncQueueNodeBase<T>
{
readonly Func<T> func;
public FuncSubmitQueueNode(Func<T> func, CancellationToken cancellationToken)
: base(new TaskCompletionSource<T>(), cancellationToken)
{
this.func = func;
}
protected override T Call() => this.func();
}
sealed class StateFuncSubmitQueueNode<T> : FuncQueueNodeBase<T>
{
readonly Func<object, T> func;
public StateFuncSubmitQueueNode(Func<object, T> func, object state, CancellationToken cancellationToken)
: base(new TaskCompletionSource<T>(state), cancellationToken)
{
this.func = func;
}
protected override T Call() => this.func(this.Completion.AsyncState);
}
sealed class StateFuncWithContextSubmitQueueNode<T> : FuncQueueNodeBase<T>
{
readonly Func<object, object, T> func;
readonly object context;
public StateFuncWithContextSubmitQueueNode(
Func<object, object, T> func,
object context,
object state,
CancellationToken cancellationToken)
: base(new TaskCompletionSource<T>(state), cancellationToken)
{
this.func = func;
this.context = context;
}
protected override T Call() => this.func(this.context, this.Completion.AsyncState);
}
#endregion
}
}

Просмотреть файл

@ -3,13 +3,15 @@
namespace DotNetty.Common.Concurrency
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Thread = DotNetty.Common.Concurrency.XThread;
public interface IEventExecutor
public interface IEventExecutor : IEventExecutorGroup
{
/// <summary>
/// Parent <see cref="IEventExecutorGroup"/>.
/// </summary>
IEventExecutorGroup Parent { get; }
/// <summary>
/// Returns <c>true</c> if the current <see cref="Thread" /> belongs to this event loop,
/// <c>false</c> otherwise.
@ -20,239 +22,10 @@ namespace DotNetty.Common.Concurrency
/// </remarks>
bool InEventLoop { get; }
/// <summary>
/// Returns <c>true</c> if and only if this executor is being shut down via <see cref="ShutdownGracefullyAsync()" />.
/// </summary>
bool IsShuttingDown { get; }
/// <summary>
/// Gets a <see cref="Task" /> object that represents the asynchronous completion of this executor's termination.
/// </summary>
Task TerminationCompletion { get; }
/// <summary>
/// Returns <c>true</c> if this executor has been shut down, <c>false</c> otherwise.
/// </summary>
bool IsShutdown { get; }
/// <summary>
/// Returns <c>true</c> if all tasks have completed following shut down.
/// </summary>
/// <remarks>
/// Note that <see cref="IsTerminated" /> is never <c>true</c> unless <see cref="ShutdownGracefullyAsync()" /> was called first.
/// </remarks>
bool IsTerminated { get; }
/// <summary>
/// Parent <see cref="IEventExecutorGroup"/>.
/// </summary>
IEventExecutorGroup Parent { get; }
/// <summary>
/// Returns <c>true</c> if the given <see cref="Thread" /> belongs to this event loop,
/// <c>false></c> otherwise.
/// </summary>
bool IsInEventLoop(Thread thread);
/// <summary>
/// Executes the given task.
/// </summary>
/// <remarks>Threading specifics are determined by <c>IEventExecutor</c> implementation.</remarks>
void Execute(IRunnable task);
/// <summary>
/// Executes the given action.
/// </summary>
/// <remarks>
/// <paramref name="state" /> parameter is useful to when repeated execution of an action against
/// different objects is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
void Execute(Action<object> action, object state);
/// <summary>
/// Executes the given <paramref name="action" />.
/// </summary>
/// <remarks>Threading specifics are determined by <c>IEventExecutor</c> implementation.</remarks>
void Execute(Action action);
/// <summary>
/// Executes the given action.
/// </summary>
/// <remarks>
/// <paramref name="context" /> and <paramref name="state" /> parameters are useful when repeated execution of
/// an action against different objects in different context is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
void Execute(Action<object, object> action, object context, object state);
/// <summary>
/// Creates and executes a one-shot action that becomes enabled after the given delay.
/// </summary>
/// <param name="action">the task to execute</param>
/// <param name="delay">the time from now to delay execution</param>
/// <returns>an <see cref="IScheduledTask" /> representing pending completion of the task.</returns>
IScheduledTask Schedule(IRunnable action, TimeSpan delay);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
IScheduledTask Schedule(Action action, TimeSpan delay);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="state" /> parameter is useful to when repeated execution of an action against
/// different objects is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="context" /> and <paramref name="state" /> parameters are useful when repeated execution of
/// an action against different objects in different context is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="state" /> parameter is useful to when repeated execution of an action against
/// different objects is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task ScheduleAsync(Action<object> action, object state, TimeSpan delay, CancellationToken cancellationToken);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="state" /> parameter is useful to when repeated execution of an action against
/// different objects is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task ScheduleAsync(Action<object> action, object state, TimeSpan delay);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task ScheduleAsync(Action action, TimeSpan delay, CancellationToken cancellationToken);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task ScheduleAsync(Action action, TimeSpan delay);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="context" /> and <paramref name="state" /> parameters are useful when repeated execution of
/// an action against different objects in different context is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="context" /> and <paramref name="state" /> parameters are useful when repeated execution of
/// an action against different objects in different context is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay, CancellationToken cancellationToken);
/// <summary>
/// Executes the given function and returns <see cref="Task{T}" /> indicating completion status and result of
/// execution.
/// </summary>
/// <remarks>
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task<T> SubmitAsync<T>(Func<T> func);
/// <summary>
/// Executes the given action and returns <see cref="Task{T}" /> indicating completion status and result of execution.
/// </summary>
/// <remarks>
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task<T> SubmitAsync<T>(Func<T> func, CancellationToken cancellationToken);
/// <summary>
/// Executes the given action and returns <see cref="Task{T}" /> indicating completion status and result of execution.
/// </summary>
/// <remarks>
/// <paramref name="state" /> parameter is useful to when repeated execution of an action against
/// different objects is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task<T> SubmitAsync<T>(Func<object, T> func, object state);
/// <summary>
/// Executes the given action and returns <see cref="Task{T}" /> indicating completion status and result of execution.
/// </summary>
/// <remarks>
/// <paramref name="state" /> parameter is useful to when repeated execution of an action against
/// different objects is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task<T> SubmitAsync<T>(Func<object, T> func, object state, CancellationToken cancellationToken);
/// <summary>
/// Executes the given action and returns <see cref="Task{T}" /> indicating completion status and result of execution.
/// </summary>
/// <remarks>
/// <paramref name="context" /> and <paramref name="state" /> parameters are useful when repeated execution of
/// an action against different objects in different context is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task<T> SubmitAsync<T>(Func<object, object, T> func, object context, object state);
/// <summary>
/// Executes the given action and returns <see cref="Task{T}" /> indicating completion status and result of execution.
/// </summary>
/// <remarks>
/// <paramref name="context" /> and <paramref name="state" /> parameters are useful when repeated execution of
/// an action against different objects in different context is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task<T> SubmitAsync<T>(Func<object, object, T> func, object context, object state, CancellationToken cancellationToken);
/// <summary>
/// Shortcut method for <see cref="ShutdownGracefullyAsync(TimeSpan,TimeSpan)" /> with sensible default values.
/// </summary>
Task ShutdownGracefullyAsync();
/// <summary>
/// Signals this executor that the caller wants the executor to be shut down. Once this method is called,
/// <see cref="IsShuttingDown" /> starts to return <c>true</c>, and the executor prepares to shut itself down.
/// Graceful shutdown ensures that no tasks are submitted for <i>'the quiet period'</i>
/// (usually a couple seconds) before it shuts itself down. If a task is submitted during the quiet period,
/// it is guaranteed to be accepted and the quiet period will start over.
/// </summary>
/// <param name="quietPeriod">the quiet period as described in the documentation.</param>
/// <param name="timeout">
/// the maximum amount of time to wait until the executor <see cref="IsShutdown" />
/// regardless if a task was submitted during the quiet period.
/// </param>
/// <returns>the <see cref="TerminationCompletion" /> task.</returns>
Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout);
}
}

Просмотреть файл

@ -9,17 +9,12 @@ namespace DotNetty.Common.Concurrency
/// <summary>
/// Provides an access to a set of <see cref="IEventExecutor"/>s it manages.
/// </summary>
public interface IEventExecutorGroup
public interface IEventExecutorGroup : IScheduledExecutorService
{
/// <summary>
/// A <see cref="Task"/> for completion of termination. <see cref="ShutdownGracefullyAsync()"/>.
/// Returns <c>true</c> if and only if this executor is being shut down via <see cref="ShutdownGracefullyAsync()" />.
/// </summary>
Task TerminationCompletion { get; }
/// <summary>
/// Returns <see cref="IEventExecutor"/>.
/// </summary>
IEventExecutor GetNext();
bool IsShuttingDown { get; }
/// <summary>
/// Terminates this <see cref="IEventExecutorGroup"/> and all its <see cref="IEventExecutor"/>s.
@ -32,5 +27,15 @@ namespace DotNetty.Common.Concurrency
/// </summary>
/// <returns><see cref="Task"/> for completion of termination.</returns>
Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout);
/// <summary>
/// A <see cref="Task"/> for completion of termination. <see cref="ShutdownGracefullyAsync()"/>.
/// </summary>
Task TerminationCompletion { get; }
/// <summary>
/// Returns <see cref="IEventExecutor"/>.
/// </summary>
IEventExecutor GetNext();
}
}

Просмотреть файл

@ -0,0 +1,41 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
{
using System;
public interface IExecutor
{
/// <summary>
/// Executes the given task.
/// </summary>
/// <remarks>Threading specifics are determined by <c>IEventExecutor</c> implementation.</remarks>
void Execute(IRunnable task);
/// <summary>
/// Executes the given action.
/// </summary>
/// <remarks>
/// <paramref name="state" /> parameter is useful to when repeated execution of an action against
/// different objects is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
void Execute(Action<object> action, object state);
/// <summary>
/// Executes the given <paramref name="action" />.
/// </summary>
/// <remarks>Threading specifics are determined by <c>IEventExecutor</c> implementation.</remarks>
void Execute(Action action);
/// <summary>
/// Executes the given action.
/// </summary>
/// <remarks>
/// <paramref name="context" /> and <paramref name="state" /> parameters are useful when repeated execution of
/// an action against different objects in different context is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
void Execute(Action<object, object> action, object context, object state);
}
}

Просмотреть файл

@ -0,0 +1,81 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
{
using System;
using System.Threading;
using System.Threading.Tasks;
public interface IExecutorService : IExecutor
{
/// <summary>
/// Returns <c>true</c> if this executor has been shut down, <c>false</c> otherwise.
/// </summary>
bool IsShutdown { get; }
/// <summary>
/// Returns <c>true</c> if all tasks have completed following shut down.
/// </summary>
/// <remarks>
/// Note that <see cref="IsTerminated" /> is never <c>true</c> unless <see cref="ShutdownGracefullyAsync()" /> was called first.
/// </remarks>
bool IsTerminated { get; }
/// <summary>
/// Executes the given function and returns <see cref="Task{T}" /> indicating completion status and result of
/// execution.
/// </summary>
/// <remarks>
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task<T> SubmitAsync<T>(Func<T> func);
/// <summary>
/// Executes the given action and returns <see cref="Task{T}" /> indicating completion status and result of execution.
/// </summary>
/// <remarks>
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task<T> SubmitAsync<T>(Func<T> func, CancellationToken cancellationToken);
/// <summary>
/// Executes the given action and returns <see cref="Task{T}" /> indicating completion status and result of execution.
/// </summary>
/// <remarks>
/// <paramref name="state" /> parameter is useful to when repeated execution of an action against
/// different objects is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task<T> SubmitAsync<T>(Func<object, T> func, object state);
/// <summary>
/// Executes the given action and returns <see cref="Task{T}" /> indicating completion status and result of execution.
/// </summary>
/// <remarks>
/// <paramref name="state" /> parameter is useful to when repeated execution of an action against
/// different objects is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task<T> SubmitAsync<T>(Func<object, T> func, object state, CancellationToken cancellationToken);
/// <summary>
/// Executes the given action and returns <see cref="Task{T}" /> indicating completion status and result of execution.
/// </summary>
/// <remarks>
/// <paramref name="context" /> and <paramref name="state" /> parameters are useful when repeated execution of
/// an action against different objects in different context is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task<T> SubmitAsync<T>(Func<object, object, T> func, object context, object state);
/// <summary>
/// Executes the given action and returns <see cref="Task{T}" /> indicating completion status and result of execution.
/// </summary>
/// <remarks>
/// <paramref name="context" /> and <paramref name="state" /> parameters are useful when repeated execution of
/// an action against different objects in different context is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task<T> SubmitAsync<T>(Func<object, object, T> func, object context, object state, CancellationToken cancellationToken);
}
}

Просмотреть файл

@ -0,0 +1,104 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Common.Concurrency
{
using System;
using System.Threading;
using System.Threading.Tasks;
public interface IScheduledExecutorService : IExecutorService
{
/// <summary>
/// Creates and executes a one-shot action that becomes enabled after the given delay.
/// </summary>
/// <param name="action">the task to execute</param>
/// <param name="delay">the time from now to delay execution</param>
/// <returns>an <see cref="IScheduledTask" /> representing pending completion of the task.</returns>
IScheduledTask Schedule(IRunnable action, TimeSpan delay);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
IScheduledTask Schedule(Action action, TimeSpan delay);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="state" /> parameter is useful to when repeated execution of an action against
/// different objects is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="context" /> and <paramref name="state" /> parameters are useful when repeated execution of
/// an action against different objects in different context is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="state" /> parameter is useful to when repeated execution of an action against
/// different objects is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task ScheduleAsync(Action<object> action, object state, TimeSpan delay, CancellationToken cancellationToken);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="state" /> parameter is useful to when repeated execution of an action against
/// different objects is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task ScheduleAsync(Action<object> action, object state, TimeSpan delay);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task ScheduleAsync(Action action, TimeSpan delay, CancellationToken cancellationToken);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task ScheduleAsync(Action action, TimeSpan delay);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="context" /> and <paramref name="state" /> parameters are useful when repeated execution of
/// an action against different objects in different context is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay);
/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="context" /> and <paramref name="state" /> parameters are useful when repeated execution of
/// an action against different objects in different context is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay, CancellationToken cancellationToken);
}
}

Просмотреть файл

@ -22,8 +22,9 @@ namespace DotNetty.Transport.Libuv
Contract.Requires(parent != null);
string pipeName = "DotNetty_" + Guid.NewGuid().ToString("n");
this.PipeName = (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)
? @"\\.\pipe\" : "/tmp/") + pipeName;
this.PipeName = (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)
? @"\\.\pipe\"
: "/tmp/") + pipeName;
this.Start();
}
@ -63,8 +64,10 @@ namespace DotNetty.Transport.Libuv
internal void Accept(NativeHandle handle) => this.nativeUnsafe.Accept(handle);
public new IEventLoop GetNext() => (IEventLoop)base.GetNext();
public Task RegisterAsync(IChannel channel) => channel.Unsafe.RegisterAsync(this);
public new IEventLoopGroup Parent => (IEventLoopGroup)base.Parent;
}
}
}

Просмотреть файл

@ -2,6 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// ReSharper disable ConvertToAutoPropertyWhenPossible
namespace DotNetty.Transport.Libuv
{
using System;
@ -9,7 +10,7 @@ namespace DotNetty.Transport.Libuv
using DotNetty.Common.Concurrency;
using DotNetty.Transport.Channels;
public sealed class DispatcherEventLoopGroup : IEventLoopGroup
public sealed class DispatcherEventLoopGroup : AbstractEventExecutorGroup, IEventLoopGroup
{
readonly DispatcherEventLoop dispatcherEventLoop;
@ -18,26 +19,26 @@ namespace DotNetty.Transport.Libuv
this.dispatcherEventLoop = new DispatcherEventLoop(this);
}
public Task TerminationCompletion => this.dispatcherEventLoop.TerminationCompletion;
public override bool IsShutdown => this.dispatcherEventLoop.IsShutdown;
public override bool IsTerminated => this.dispatcherEventLoop.IsTerminated;
public override bool IsShuttingDown => this.dispatcherEventLoop.IsShuttingDown;
public override Task TerminationCompletion => this.dispatcherEventLoop.TerminationCompletion;
internal DispatcherEventLoop Dispatcher => this.dispatcherEventLoop;
IEventExecutor IEventExecutorGroup.GetNext() => this.GetNext();
IEventLoop IEventLoopGroup.GetNext() => (IEventLoop)this.GetNext();
public Task RegisterAsync(IChannel channel) => this.GetNext().RegisterAsync(channel);
public override IEventExecutor GetNext() => this.dispatcherEventLoop;
public IEventLoop GetNext() => this.dispatcherEventLoop;
public Task RegisterAsync(IChannel channel) => ((IEventLoop)this.GetNext()).RegisterAsync(channel);
public Task ShutdownGracefullyAsync()
{
this.dispatcherEventLoop.ShutdownGracefullyAsync();
return this.TerminationCompletion;
}
public Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
{
this.dispatcherEventLoop.ShutdownGracefullyAsync(quietPeriod, timeout);
return this.TerminationCompletion;
}
}
}
}

Просмотреть файл

@ -14,8 +14,10 @@ namespace DotNetty.Transport.Libuv
this.Start();
}
public new IEventLoop GetNext() => (IEventLoop)base.GetNext();
public Task RegisterAsync(IChannel channel) => channel.Unsafe.RegisterAsync(this);
public new IEventLoopGroup Parent => (IEventLoopGroup)base.Parent;
}
}
}

Просмотреть файл

@ -2,6 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// ReSharper disable ForCanBeConvertedToForeach
namespace DotNetty.Transport.Libuv
{
using System;
@ -12,13 +13,22 @@ namespace DotNetty.Transport.Libuv
using DotNetty.Transport.Channels;
using DotNetty.Transport.Libuv.Native;
public sealed class EventLoopGroup : IEventLoopGroup
public sealed class EventLoopGroup : AbstractEventExecutorGroup, IEventLoopGroup
{
static readonly int DefaultEventLoopCount = Environment.ProcessorCount;
readonly EventLoop[] eventLoops;
int requestId;
public EventLoopGroup() : this(DefaultEventLoopCount)
public override bool IsShutdown => this.eventLoops.All(eventLoop => eventLoop.IsShutdown);
public override bool IsTerminated => this.eventLoops.All(eventLoop => eventLoop.IsTerminated);
public override bool IsShuttingDown => this.eventLoops.All(eventLoop => eventLoop.IsShuttingDown);
public override Task TerminationCompletion { get; }
public EventLoopGroup()
: this(DefaultEventLoopCount)
{
}
@ -43,9 +53,10 @@ namespace DotNetty.Transport.Libuv
{
if (!success)
{
Task.WhenAll(this.eventLoops
.Take(i)
.Select(loop => loop.ShutdownGracefullyAsync()))
Task.WhenAll(
this.eventLoops
.Take(i)
.Select(loop => loop.ShutdownGracefullyAsync()))
.Wait();
}
}
@ -53,14 +64,11 @@ namespace DotNetty.Transport.Libuv
this.eventLoops[i] = eventLoop;
terminationTasks[i] = eventLoop.TerminationCompletion;
}
this.TerminationCompletion = Task.WhenAll(terminationTasks);
}
public Task TerminationCompletion { get; }
IEventExecutor IEventExecutorGroup.GetNext() => this.GetNext();
public IEventLoop GetNext()
public override IEventExecutor GetNext()
{
// Attempt to select event loop based on thread first
int threadId = XThread.CurrentThread.Id;
@ -79,6 +87,8 @@ namespace DotNetty.Transport.Libuv
return this.eventLoops[Math.Abs(i % this.eventLoops.Length)];
}
IEventLoop IEventLoopGroup.GetNext() => (IEventLoop)this.GetNext();
public Task RegisterAsync(IChannel channel)
{
if (!(channel is NativeChannel nativeChannel))
@ -101,16 +111,7 @@ namespace DotNetty.Transport.Libuv
throw new InvalidOperationException($"Loop {loopHandle} does not exist");
}
public Task ShutdownGracefullyAsync()
{
foreach (EventLoop eventLoop in this.eventLoops)
{
eventLoop.ShutdownGracefullyAsync();
}
return this.TerminationCompletion;
}
public Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
{
foreach (EventLoop eventLoop in this.eventLoops)
{
@ -119,4 +120,4 @@ namespace DotNetty.Transport.Libuv
return this.TerminationCompletion;
}
}
}
}

Просмотреть файл

@ -3,6 +3,7 @@
// ReSharper disable ConvertToAutoProperty
// ReSharper disable ConvertToAutoPropertyWithPrivateSetter
namespace DotNetty.Transport.Libuv
{
using System;
@ -20,7 +21,8 @@ namespace DotNetty.Transport.Libuv
readonly string pipeName;
Pipe pipe;
public WorkerEventLoop(WorkerEventLoopGroup parent) : base(parent, null)
public WorkerEventLoop(WorkerEventLoopGroup parent)
: base(parent, null)
{
Contract.Requires(parent != null);
@ -104,6 +106,8 @@ namespace DotNetty.Transport.Libuv
}
}
public new IEventLoop GetNext() => (IEventLoop)base.GetNext();
public Task RegisterAsync(IChannel channel) => channel.Unsafe.RegisterAsync(this);
public new IEventLoopGroup Parent => (IEventLoopGroup)base.Parent;
@ -138,11 +142,11 @@ namespace DotNetty.Transport.Libuv
}
}
void Connect() => NativeMethods.uv_pipe_connect(
void Connect() => NativeMethods.uv_pipe_connect(
this.Handle,
this.workerEventLoop.pipe.Handle,
this.workerEventLoop.pipeName,
WatcherCallback);
}
}
}
}

Просмотреть файл

@ -2,6 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// ReSharper disable ForCanBeConvertedToForeach
namespace DotNetty.Transport.Libuv
{
using System;
@ -15,7 +16,7 @@ namespace DotNetty.Transport.Libuv
using DotNetty.Transport.Channels;
using DotNetty.Transport.Libuv.Native;
public sealed class WorkerEventLoopGroup : IEventLoopGroup
public sealed class WorkerEventLoopGroup : AbstractEventExecutorGroup, IEventLoopGroup
{
static readonly int DefaultEventLoopThreadCount = Environment.ProcessorCount;
static readonly TimeSpan StartTimeout = TimeSpan.FromMilliseconds(500);
@ -24,7 +25,15 @@ namespace DotNetty.Transport.Libuv
readonly DispatcherEventLoop dispatcherLoop;
int requestId;
public WorkerEventLoopGroup(DispatcherEventLoopGroup eventLoopGroup)
public override bool IsShutdown => this.eventLoops.All(eventLoop => eventLoop.IsShutdown);
public override bool IsTerminated => this.eventLoops.All(eventLoop => eventLoop.IsTerminated);
public override bool IsShuttingDown => this.eventLoops.All(eventLoop => eventLoop.IsShuttingDown);
public override Task TerminationCompletion { get; }
public WorkerEventLoopGroup(DispatcherEventLoopGroup eventLoopGroup)
: this(eventLoopGroup, DefaultEventLoopThreadCount)
{
}
@ -81,16 +90,14 @@ namespace DotNetty.Transport.Libuv
this.dispatcherLoop.Accept(handle);
}
public Task TerminationCompletion { get; }
IEventLoop IEventLoopGroup.GetNext() => (IEventLoop)this.GetNext();
public IEventLoop GetNext()
public override IEventExecutor GetNext()
{
int id = Interlocked.Increment(ref this.requestId);
return this.eventLoops[Math.Abs(id % this.eventLoops.Length)];
}
IEventExecutor IEventExecutorGroup.GetNext() => this.GetNext();
public Task RegisterAsync(IChannel channel)
{
if (!(channel is NativeChannel nativeChannel))
@ -100,7 +107,7 @@ namespace DotNetty.Transport.Libuv
NativeHandle handle = nativeChannel.GetHandle();
IntPtr loopHandle = handle.LoopHandle();
for (int i=0; i <this.eventLoops.Length; i++)
for (int i = 0; i < this.eventLoops.Length; i++)
{
if (this.eventLoops[i].UnsafeLoop.Handle == loopHandle)
{
@ -111,16 +118,7 @@ namespace DotNetty.Transport.Libuv
throw new InvalidOperationException($"Loop {loopHandle} does not exist");
}
public Task ShutdownGracefullyAsync()
{
foreach (WorkerEventLoop eventLoop in this.eventLoops)
{
eventLoop.ShutdownGracefullyAsync();
}
return this.TerminationCompletion;
}
public Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
{
foreach (WorkerEventLoop eventLoop in this.eventLoops)
{
@ -129,4 +127,4 @@ namespace DotNetty.Transport.Libuv
return this.TerminationCompletion;
}
}
}
}

Просмотреть файл

@ -10,10 +10,19 @@ namespace DotNetty.Transport.Channels
/// <summary>
/// <see cref="IEventLoopGroup"/> that works as a wrapper for another <see cref="IEventLoopGroup"/> providing affinity on <see cref="GetNext"/> call.
/// </summary>
public class AffinitizedEventLoopGroup : IEventLoopGroup
public class AffinitizedEventLoopGroup : AbstractEventExecutorGroup, IEventLoopGroup
{
readonly IEventLoopGroup innerGroup;
public override bool IsShutdown => this.innerGroup.IsShutdown;
public override bool IsTerminated => this.innerGroup.IsTerminated;
public override bool IsShuttingDown => this.innerGroup.IsShuttingDown;
/// <inheritdoc cref="IEventExecutorGroup"/>
public override Task TerminationCompletion => this.innerGroup.TerminationCompletion;
/// <summary>
/// Creates a new instance of <see cref="AffinitizedEventLoopGroup"/>.
/// </summary>
@ -23,24 +32,15 @@ namespace DotNetty.Transport.Channels
this.innerGroup = innerGroup;
}
/// <inheritdoc cref="IEventLoopGroup"/>
public Task TerminationCompletion => this.innerGroup.TerminationCompletion;
IEventExecutor IEventExecutorGroup.GetNext() => this.GetNext();
public Task RegisterAsync(IChannel channel) => this.GetNext().RegisterAsync(channel);
/// <summary>
/// If running in a context of an existing <see cref="IEventLoop"/>, this <see cref="IEventLoop"/> is returned.
/// Otherwise, <see cref="IEventLoop"/> is retrieved from underlying <see cref="IEventLoopGroup"/>.
/// </summary>
public IEventLoop GetNext()
public override IEventExecutor GetNext()
{
IEventExecutor executor;
if (ExecutionEnvironment.TryGetCurrentExecutor(out executor))
if (ExecutionEnvironment.TryGetCurrentExecutor(out var executor))
{
var loop = executor as IEventLoop;
if (loop != null && loop.Parent == this.innerGroup)
if (executor is IEventLoop loop && loop.Parent == this.innerGroup)
{
return loop;
}
@ -48,10 +48,11 @@ namespace DotNetty.Transport.Channels
return this.innerGroup.GetNext();
}
/// <inheritdoc cref="IEventLoopGroup"/>
public Task ShutdownGracefullyAsync() => this.innerGroup.ShutdownGracefullyAsync();
IEventLoop IEventLoopGroup.GetNext() => (IEventLoop)this.GetNext();
/// <inheritdoc cref="IEventLoopGroup"/>
public Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout) => this.innerGroup.ShutdownGracefullyAsync(quietPeriod, timeout);
public Task RegisterAsync(IChannel channel) => ((IEventLoop)this.GetNext()).RegisterAsync(channel);
/// <inheritdoc cref="IEventExecutorGroup"/>
public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout) => this.innerGroup.ShutdownGracefullyAsync(quietPeriod, timeout);
}
}

Просмотреть файл

@ -14,16 +14,13 @@ namespace DotNetty.Transport.Channels.Embedded
{
readonly Queue<IRunnable> tasks = new Queue<IRunnable>(2);
public IEventExecutor Executor => this;
public new IEventLoop GetNext() => this;
public Task RegisterAsync(IChannel channel) => channel.Unsafe.RegisterAsync(this);
public override bool IsShuttingDown => false;
public override Task TerminationCompletion
{
get { throw new NotSupportedException(); }
}
public override Task TerminationCompletion => throw new NotSupportedException();
public override bool IsShutdown => false;

Просмотреть файл

@ -3,24 +3,16 @@
namespace DotNetty.Transport.Channels
{
using System.Threading.Tasks;
using DotNetty.Common.Concurrency;
/// <summary>
/// <see cref="IEventExecutor"/> specialized to handle I/O operations of assigned <see cref="IChannel"/>s.
/// </summary>
public interface IEventLoop : IEventExecutor
public interface IEventLoop : IEventLoopGroup, IEventExecutor
{
/// <summary>
/// Parent <see cref="IEventLoopGroup"/>.
/// </summary>
new IEventLoopGroup Parent { get; }
/// <summary>
/// Registers provided <see cref="IChannel"/> with this <see cref="IEventLoop"/>.
/// </summary>
/// <param name="channel"><see cref="IChannel"/> to register.</param>
/// <returns><see cref="Task"/> for completion of registration.</returns>
Task RegisterAsync(IChannel channel);
}
}

Просмотреть файл

@ -6,8 +6,9 @@ namespace DotNetty.Transport.Channels
using System.Threading.Tasks;
using DotNetty.Common.Concurrency;
/// <inheritdoc />
/// <summary>
/// <see cref="IEventExecutorGroup"/> specialized for handling <see cref="IEventLoop"/>s.
/// <see cref="IEventExecutorGroup" /> specialized for handling <see cref="IEventLoop" />s.
/// </summary>
public interface IEventLoopGroup : IEventExecutorGroup
{

Просмотреть файл

@ -12,7 +12,7 @@ namespace DotNetty.Transport.Channels
/// <summary>
/// <see cref="IEventLoopGroup"/> backed by a set of <see cref="SingleThreadEventLoop"/> instances.
/// </summary>
public sealed class MultithreadEventLoopGroup : IEventLoopGroup
public sealed class MultithreadEventLoopGroup : AbstractEventExecutorGroup, IEventLoopGroup
{
static readonly int DefaultEventLoopThreadCount = Environment.ProcessorCount * 2;
static readonly Func<IEventLoopGroup, IEventLoop> DefaultEventLoopFactory = group => new SingleThreadEventLoop(group);
@ -20,6 +20,15 @@ namespace DotNetty.Transport.Channels
readonly IEventLoop[] eventLoops;
int requestId;
public override bool IsShutdown => eventLoops.All(eventLoop => eventLoop.IsShutdown);
public override bool IsTerminated => eventLoops.All(eventLoop => eventLoop.IsTerminated);
public override bool IsShuttingDown => eventLoops.All(eventLoop => eventLoop.IsShuttingDown);
/// <inheritdoc />
public override Task TerminationCompletion { get; }
/// <summary>Creates a new instance of <see cref="MultithreadEventLoopGroup"/>.</summary>
public MultithreadEventLoopGroup()
: this(DefaultEventLoopFactory, DefaultEventLoopThreadCount)
@ -60,9 +69,10 @@ namespace DotNetty.Transport.Channels
{
if (!success)
{
Task.WhenAll(this.eventLoops
.Take(i)
.Select(loop => loop.ShutdownGracefullyAsync()))
Task.WhenAll(
this.eventLoops
.Take(i)
.Select(loop => loop.ShutdownGracefullyAsync()))
.Wait();
}
}
@ -74,32 +84,19 @@ namespace DotNetty.Transport.Channels
}
/// <inheritdoc />
public Task TerminationCompletion { get; }
IEventLoop IEventLoopGroup.GetNext() => (IEventLoop)this.GetNext();
/// <inheritdoc />
public IEventLoop GetNext()
public override IEventExecutor GetNext()
{
int id = Interlocked.Increment(ref this.requestId);
return this.eventLoops[Math.Abs(id % this.eventLoops.Length)];
}
/// <inheritdoc />
IEventExecutor IEventExecutorGroup.GetNext() => this.GetNext();
public Task RegisterAsync(IChannel channel) => ((IEventLoop)this.GetNext()).RegisterAsync(channel);
public Task RegisterAsync(IChannel channel) => this.GetNext().RegisterAsync(channel);
/// <inheritdoc />
public Task ShutdownGracefullyAsync()
{
foreach (IEventLoop eventLoop in this.eventLoops)
{
eventLoop.ShutdownGracefullyAsync();
}
return this.TerminationCompletion;
}
/// <inheritdoc />
public Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
/// <inheritdoc cref="IEventExecutorGroup.ShutdownGracefullyAsync()" />
public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
{
foreach (IEventLoop eventLoop in this.eventLoops)
{

Просмотреть файл

@ -63,6 +63,8 @@ namespace DotNetty.Transport.Channels
{
}
public new IEventLoop GetNext() => this;
/// <inheritdoc />
public Task RegisterAsync(IChannel channel) => channel.Unsafe.RegisterAsync(this);