Fixes ByteBuffer+Stream integration, TLS neg, STEE Shutdown

Motivation:
Fix up top priority issues to ensure proper working state with recent changes.

Modifications:
- TlsHandler negotiates TLS 1.0+ on server side (#89)
- STEE properly supports graceful shutdown (#7)
- UnpooledHeapByteBuffer.GetBytes honors received index and length (#88)
- Echo E2E test uses length-prefix based framing (#90)

Result:
Setting up DotNetty does not require workarounds for shutdown and hacks to enable negotiation of higher versions of TLS. Tests are passing even with new SslStream behavior.
This commit is contained in:
Max Gortman 2016-04-27 00:29:56 -07:00
Родитель d3e6fe4b54
Коммит aa3cbc6766
9 изменённых файлов: 62 добавлений и 49 удалений

Просмотреть файл

@ -8,6 +8,7 @@ namespace Echo.Client
using System.Net;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using DotNetty.Codecs;
using DotNetty.Common.Internal.Logging;
using DotNetty.Handlers.Tls;
using DotNetty.Transport.Bootstrapping;
@ -42,10 +43,12 @@ namespace Echo.Client
string targetHost = cert.GetNameInfo(X509NameType.DnsName, false);
pipeline.AddLast(TlsHandler.Client(targetHost, null, (sender, certificate, chain, errors) => true));
}
pipeline.AddLast(new LengthFieldPrepender(2));
pipeline.AddLast(new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
pipeline.AddLast(new EchoClientHandler());
}));
IChannel bootstrapChannel = await bootstrap.ConnectAsync(new IPEndPoint(EchoClientSettings.Host, EchoClientSettings.Port));
Console.ReadLine();
@ -64,4 +67,4 @@ namespace Echo.Client
Task.Run(() => RunClient()).Wait();
}
}
}
}

Просмотреть файл

@ -7,6 +7,7 @@ namespace Echo.Server
using System.Diagnostics.Tracing;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using DotNetty.Codecs;
using DotNetty.Common.Internal.Logging;
using DotNetty.Handlers.Logging;
using DotNetty.Handlers.Tls;
@ -41,6 +42,8 @@ namespace Echo.Server
{
pipeline.AddLast(TlsHandler.Server(new X509Certificate2("dotnetty.com.pfx", "password")));
}
pipeline.AddLast(new LengthFieldPrepender(2));
pipeline.AddLast(new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
pipeline.AddLast(new EchoServerHandler());
}));
@ -63,4 +66,4 @@ namespace Echo.Server
Task.Run(() => RunServer()).Wait();
}
}
}
}

Просмотреть файл

@ -635,10 +635,11 @@ namespace DotNetty.Buffers
else
{
IByteBuffer buffer = src.Allocator.Buffer(len);
Contract.Assert(buffer.HasArray, "Operation expects allocator to operate array-based buffers.");
try
{
buffer.WriteBytes(src, readerIndex, len);
return encoding.GetString(buffer.Array, 0, len);
return encoding.GetString(buffer.Array, buffer.ArrayOffset, len);
}
finally
{

Просмотреть файл

@ -148,7 +148,7 @@ namespace DotNetty.Buffers
public override IByteBuffer GetBytes(int index, Stream destination, int length)
{
destination.Write(this.Array, this.ArrayOffset + this.ReaderIndex, this.ReadableBytes);
destination.Write(this.Array, this.ArrayOffset + index, length);
return this;
}

Просмотреть файл

@ -21,6 +21,8 @@ namespace DotNetty.Common.Concurrency
const int ST_TERMINATED = 5;
const string DefaultWorkerThreadName = "SingleThreadEventExecutor worker";
static readonly IRunnable WAKEUP_TASK = new NoOpRunnable();
static readonly IInternalLogger Logger =
InternalLoggerFactory.GetInstance<SingleThreadEventExecutor>();
@ -70,15 +72,12 @@ namespace DotNetty.Common.Concurrency
Task.Factory.StartNew(
() =>
{
if (Interlocked.CompareExchange(ref this.executionState, ST_STARTED, ST_NOT_STARTED) == ST_NOT_STARTED)
Interlocked.CompareExchange(ref this.executionState, ST_STARTED, ST_NOT_STARTED);
while (!this.ConfirmShutdown())
{
while (!this.ConfirmShutdown())
{
this.RunAllTasks(this.preciseBreakoutInterval);
}
this.CleanupAndTerminate(true);
this.RunAllTasks(this.preciseBreakoutInterval);
}
this.CleanupAndTerminate(true);
},
CancellationToken.None,
TaskCreationOptions.None,
@ -120,6 +119,14 @@ namespace DotNetty.Common.Concurrency
}
}
protected void WakeUp(bool inEventLoop)
{
if (!inEventLoop || this.executionState == ST_SHUTTING_DOWN)
{
this.Execute(WAKEUP_TASK);
}
}
public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
{
Contract.Requires(quietPeriod >= TimeSpan.Zero);
@ -174,10 +181,10 @@ namespace DotNetty.Common.Concurrency
// scheduleExecution();
//}
//if (wakeup)
//{
// wakeup(inEventLoop);
//}
if (wakeup)
{
this.WakeUp(inEventLoop);
}
return this.TerminationCompletion;
}
@ -189,10 +196,7 @@ namespace DotNetty.Common.Concurrency
return false;
}
if (!this.InEventLoop)
{
throw new InvalidOperationException("must be invoked from an event loop");
}
Contract.Assert(this.InEventLoop, "must be invoked from an event loop");
this.CancelScheduledTasks();
@ -210,8 +214,7 @@ namespace DotNetty.Common.Concurrency
}
// There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period.
// todo: ???
//wakeup(true);
this.WakeUp(true);
return false;
}
@ -227,7 +230,7 @@ namespace DotNetty.Common.Concurrency
// Check if any tasks were added to the queue every 100ms.
// TODO: Change the behavior of takeTask() so that it returns on timeout.
// todo: ???
//wakeup(true);
this.WakeUp(true);
Thread.Sleep(100);
return false;
@ -243,7 +246,6 @@ namespace DotNetty.Common.Concurrency
while (true)
{
int oldState = this.executionState;
;
if (oldState >= ST_SHUTTING_DOWN || Interlocked.CompareExchange(ref this.executionState, ST_SHUTTING_DOWN, oldState) == oldState)
{
break;
@ -398,7 +400,7 @@ namespace DotNetty.Common.Concurrency
if (task == null)
{
this.emptyEvent.Reset();
if ((task = this.taskQueue.Dequeue()) == null) // revisit queue as producer might have put a task in meanwhile
if ((task = this.taskQueue.Dequeue()) == null && !this.IsShuttingDown) // revisit queue as producer might have put a task in meanwhile
{
IScheduledRunnable nextScheduledTask = this.ScheduledTaskQueue.Peek();
if (nextScheduledTask != null)
@ -424,27 +426,11 @@ namespace DotNetty.Common.Concurrency
return task;
}
#region IDisposable members
public void Dispose()
sealed class NoOpRunnable : IRunnable
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
public void Dispose(bool isDisposing)
{
if (!this.disposed)
public void Run()
{
if (isDisposing)
{
this.thread = null;
}
}
this.disposed = true;
}
#endregion
}
}

Просмотреть файл

@ -247,7 +247,7 @@ namespace DotNetty.Handlers.Tls
this.state = oldState | State.Authenticating;
if (this.isServer)
{
this.sslStream.AuthenticateAsServerAsync(this.certificate) // todo: change to begin/end
this.sslStream.AuthenticateAsServerAsync(this.certificate, false, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12, false) // todo: change to begin/end
.ContinueWith(AuthenticationCompletionCallback, this, TaskContinuationOptions.ExecuteSynchronously);
}
else

Просмотреть файл

@ -118,6 +118,21 @@ namespace DotNetty.Common.Tests.Concurrency
Assert.True(task.IsCompleted);
}
[Theory]
[InlineData(0)]
[InlineData(200)]
public async Task ShutdownWhileIdle(int delayInMs)
{
var scheduler = new SingleThreadEventExecutor("test", TimeSpan.FromMilliseconds(10));
if (delayInMs > 0)
{
Thread.Sleep(delayInMs);
}
Task shutdownTask = scheduler.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(50), TimeSpan.FromSeconds(1));
await Task.WhenAny(shutdownTask, Task.Delay(TimeSpan.FromSeconds(5)));
Assert.True(shutdownTask.IsCompleted);
}
class Container<T>
{
public T Value;

Просмотреть файл

@ -21,6 +21,7 @@
<NuGetPackageImportStamp>8624fbb3</NuGetPackageImportStamp>
<SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\..\</SolutionDir>
<RestorePackages>true</RestorePackages>
<TargetFrameworkProfile />
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>

Просмотреть файл

@ -11,6 +11,7 @@ namespace DotNetty.Tests.End2End
using System.Text;
using System.Threading.Tasks;
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Codecs.Mqtt;
using DotNetty.Codecs.Mqtt.Packets;
using DotNetty.Common.Concurrency;
@ -49,7 +50,9 @@ namespace DotNetty.Tests.End2End
var tlsCertificate = new X509Certificate2("dotnetty.com.pfx", "password");
Func<Task> closeServerFunc = await this.StartServerAsync(true, ch =>
{
ch.Pipeline.AddLast(TlsHandler.Server(tlsCertificate));
ch.Pipeline.AddLast("server tls", TlsHandler.Server(tlsCertificate));
ch.Pipeline.AddLast("server prepender", new LengthFieldPrepender(2));
ch.Pipeline.AddLast("server decoder", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
ch.Pipeline.AddLast(new EchoChannelHandler());
}, testPromise);
@ -61,7 +64,9 @@ namespace DotNetty.Tests.End2End
.Handler(new ActionChannelInitializer<ISocketChannel>(ch =>
{
string targetHost = tlsCertificate.GetNameInfo(X509NameType.DnsName, false);
ch.Pipeline.AddLast(TlsHandler.Client(targetHost, null, (sender, certificate, chain, errors) => true));
ch.Pipeline.AddLast("client tls", TlsHandler.Client(targetHost, null, (sender, certificate, chain, errors) => true));
ch.Pipeline.AddLast("client prepender", new LengthFieldPrepender(2));
ch.Pipeline.AddLast("client decoder", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
ch.Pipeline.AddLast(new TestScenarioRunner(this.GetEchoClientScenario, testPromise));
}));
@ -74,8 +79,8 @@ namespace DotNetty.Tests.End2End
this.Output.WriteLine("Connected channel: {0}", clientChannel);
await Task.WhenAny(testPromise.Task, Task.Delay(TimeSpan.FromMinutes(1)));
Assert.True(testPromise.Task.IsCompleted);
await Task.WhenAny(testPromise.Task, Task.Delay(TimeSpan.FromSeconds(30)));
Assert.True(testPromise.Task.IsCompleted, "timed out");
testPromise.Task.Wait();
}
finally
@ -277,7 +282,6 @@ namespace DotNetty.Tests.End2End
var bossGroup = new MultithreadEventLoopGroup(1);
var workerGroup = new MultithreadEventLoopGroup();
bool started = false;
//var tlsCertificate = new X509Certificate2("dotnetty.com.pfx", "password");
try
{
ServerBootstrap b = new ServerBootstrap()