Updated TaskScheduler to allow to utilize all the logical processors.

Fixed buffer size overflow catches.
This commit is contained in:
DJGosnell 2019-09-27 18:17:06 -04:00
Родитель 47f9d376e6
Коммит 5fc826c84d
13 изменённых файлов: 222 добавлений и 148 удалений

Просмотреть файл

@ -49,7 +49,7 @@ namespace DtronixMessageQueue.Tests.Layers.Application.Tls
await _innerStream.ReadAsync(buffer, _cancellationTokenSource.Token),
_cancellationTokenSource.Token);
_innerStream.AsyncReadReceived(_data, _cancellationTokenSource.Token);
_innerStream.AsyncReadReceived(_data, _cancellationTokenSource.Token).Wait();
task.Wait();
@ -71,7 +71,7 @@ namespace DtronixMessageQueue.Tests.Layers.Application.Tls
Assert.AreEqual(_data.Span[i], buffer.Span[0]);
}
},_cancellationTokenSource.Token);
_innerStream.AsyncReadReceived(_data, _cancellationTokenSource.Token);
_innerStream.AsyncReadReceived(_data, _cancellationTokenSource.Token).Wait();
task.Wait();
}
@ -90,23 +90,10 @@ namespace DtronixMessageQueue.Tests.Layers.Application.Tls
for (int i = 0; i < 10; i++)
{
_innerStream.AsyncReadReceived(_data.Slice(i, 1), _cancellationTokenSource.Token);
_innerStream.AsyncReadReceived(_data.Slice(i, 1), _cancellationTokenSource.Token).Wait();
}
task.Wait();
}
[Test]
public void AsyncReadTransitionsToSyncRead()
{
var buffer = new Memory<byte>(new byte[20]);
AsyncReadsWithSmallReceivedData();
_innerStream.AsyncMode = false;
}
}
}

Просмотреть файл

@ -88,7 +88,7 @@ namespace DtronixMessageQueue.Tests.Transports
Task.Run(async () =>
{
e.Session.Send(memory.Slice(0, 5), true);
await Task.Delay(50);
//await Task.Delay(50);
e.Session.Send(memory.Slice(5, 5), true);
});
};

Просмотреть файл

@ -92,12 +92,14 @@ namespace DtronixMessageQueue.Tests.Transports
ApplicationServerConfig = new ApplicationConfig
{
Logger = Logger
Logger = Logger,
TransportConfig = ServerConfig
};
ApplicationClientConfig = new ApplicationConfig
{
Logger = Logger
Logger = Logger,
TransportConfig = ClientConfig
};
TlsClientConfig = new TlsApplicationConfig
@ -105,7 +107,8 @@ namespace DtronixMessageQueue.Tests.Transports
Certificate = TlsCertificate,
CertificateValidationCallback = (sender, certificate, chain, errors)
=> RemoteCertificateValidationCallback(sender, certificate, chain, errors),
Logger = Logger
Logger = Logger,
TransportConfig = ClientConfig
};
TlsServerConfig = new TlsApplicationConfig
@ -113,7 +116,8 @@ namespace DtronixMessageQueue.Tests.Transports
Certificate = TlsCertificate,
CertificateValidationCallback = (sender, certificate, chain, errors)
=> RemoteCertificateValidationCallback(sender, certificate, chain, errors),
Logger = Logger
Logger = Logger,
TransportConfig = ServerConfig
};
RemoteCertificateValidationCallback = (sender, certificate, chain, errors) => true;

Просмотреть файл

@ -1,7 +1,11 @@
namespace DtronixMessageQueue.Layers.Application
using DtronixMessageQueue.Layers.Transports;
namespace DtronixMessageQueue.Layers.Application
{
public class ApplicationConfig
{
public MqLogger Logger { get; set; }
public TransportConfig TransportConfig { get; set; }
}
}

Просмотреть файл

@ -19,6 +19,10 @@ namespace DtronixMessageQueue.Layers.Application
public SessionState State => TransportSession.State;
protected Exception LastSendException;
private int _bufferSize;
protected ApplicationSession(ITransportSession transportSession, ApplicationConfig config)
{
_config = config;
@ -30,6 +34,8 @@ namespace DtronixMessageQueue.Layers.Application
TransportSession.Ready += OnTransportSessionReady;
Mode = transportSession.Mode;
_bufferSize = _config.TransportConfig.SendAndReceiveBufferSize;
}
protected virtual void OnTransportSessionReady(object sender, SessionEventArgs e)
@ -71,8 +77,26 @@ namespace DtronixMessageQueue.Layers.Application
public virtual void Send(ReadOnlyMemory<byte> buffer, bool flush)
{
if (buffer.Length > _bufferSize)
{
_config.Logger?.Error(
$"{Mode} TlsApplication Sending {buffer.Length} bytes exceeds the SendAndReceiveBufferSize[{_bufferSize}].");
throw new Exception(
$"{Mode} TlsApplication Sending {buffer.Length} bytes exceeds the SendAndReceiveBufferSize[{_bufferSize}].");
}
_config.Logger?.Trace($"{Mode} Application sent {buffer.Length} bytes. Flush: {flush}");
TransportSession.Send(buffer, flush);
try
{
TransportSession.Send(buffer, flush);
}
catch (Exception e)
{
LastSendException = e;
}
}
}
}

Просмотреть файл

@ -6,19 +6,19 @@ namespace DtronixMessageQueue.Layers.Application.Tls
{
private readonly TlsApplicationConfig _config;
private readonly BufferMemoryPool _memoryPool;
private readonly TlsAuthScheduler _tlsAuthScheduler;
private readonly TlsTaskScheduler _tlsTaskScheduler;
public TlsApplicationClientConnector(ITransportFactory factory, TlsApplicationConfig config)
: base(factory)
{
_config = config;
_memoryPool = new BufferMemoryPool(factory.Config.SendAndReceiveBufferSize, 2 * factory.Config.MaxConnections);
_tlsAuthScheduler = new TlsAuthScheduler();
_tlsTaskScheduler = new TlsTaskScheduler(config);
}
protected override ApplicationSession CreateSession(ITransportSession session)
{
return new TlsApplicationSession(session, _config, _memoryPool, _tlsAuthScheduler);
return new TlsApplicationSession(session, _config, _memoryPool, _tlsTaskScheduler);
}
}
}

Просмотреть файл

@ -10,5 +10,7 @@ namespace DtronixMessageQueue.Layers.Application.Tls
public int AuthTimeout { get; set; } = 5000;
public RemoteCertificateValidationCallback CertificateValidationCallback { get; set; }
public int TlsSchedulerThreads { get; set; } = -1;
}
}

Просмотреть файл

@ -6,20 +6,20 @@ namespace DtronixMessageQueue.Layers.Application.Tls
{
private readonly TlsApplicationConfig _config;
private readonly BufferMemoryPool _memoryPool;
private readonly TlsAuthScheduler _tlsAuthScheduler;
private readonly TlsTaskScheduler _tlsTaskScheduler;
public TlsApplicationListener(ITransportFactory factory, TlsApplicationConfig config)
: base(factory)
{
_config = config;
_memoryPool = new BufferMemoryPool(factory.Config.SendAndReceiveBufferSize, 2 * factory.Config.MaxConnections);
_tlsAuthScheduler = new TlsAuthScheduler();
_tlsTaskScheduler = new TlsTaskScheduler(config);
}
protected override ApplicationSession CreateSession(ITransportSession session)
{
return new TlsApplicationSession(session, _config, _memoryPool, _tlsAuthScheduler);
return new TlsApplicationSession(session, _config, _memoryPool, _tlsTaskScheduler);
}
}
}

Просмотреть файл

@ -1,5 +1,6 @@
using System;
using System.Buffers;
using System.Data;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
@ -11,30 +12,33 @@ namespace DtronixMessageQueue.Layers.Application.Tls
public class TlsApplicationSession : ApplicationSession
{
private readonly TlsApplicationConfig _config;
private TlsAuthScheduler _scheduler;
private TlsTaskScheduler _taskScheduler;
private TlsInnerStream _innerStream;
private SslStream _tlsStream;
private IMemoryOwner<byte> _tlsReadBuffer;
private X509Certificate _sessionCertificate;
private SemaphoreSlim _authSemaphore = new SemaphoreSlim(0, 1);
private CancellationTokenSource _authSemaphoreCancellationTokenSource;
//private SemaphoreSlim _authSemaphore = new SemaphoreSlim(0, 1);
private CancellationTokenSource _cancellationTokenSource;
private readonly int _bufferSize;
public TlsApplicationSession(ITransportSession transportSession, TlsApplicationConfig config, BufferMemoryPool memoryPool, TlsAuthScheduler scheduler)
public TlsApplicationSession(ITransportSession transportSession, TlsApplicationConfig config, BufferMemoryPool memoryPool, TlsTaskScheduler taskScheduler)
:base(transportSession, config)
{
_config = config;
_scheduler = scheduler;
_taskScheduler = taskScheduler;
_innerStream = new TlsInnerStream(OnTlsStreamWrite);
_tlsStream = new SslStream(_innerStream, true, _config.CertificateValidationCallback);
_tlsReadBuffer = memoryPool.Rent();
_bufferSize = _tlsReadBuffer.Memory.Length;
_cancellationTokenSource = new CancellationTokenSource(_config.AuthTimeout);
}
protected override void OnTransportSessionConnected(object sender, SessionEventArgs e)
{
Task.Factory.StartNew(AuthenticateSession, _scheduler);
Task.Factory.StartNew(StartSession, _taskScheduler);
// Alert that we are connected, but not ready.
base.OnTransportSessionConnected(this, new SessionEventArgs(this));
@ -46,90 +50,111 @@ namespace DtronixMessageQueue.Layers.Application.Tls
// Do nothing as at this point, we are not authenticated.
}
private async void AuthenticateSession(object obj)
private async void StartSession(object obj)
{
_authSemaphoreCancellationTokenSource = new CancellationTokenSource(_config.AuthTimeout);
try
{
if (TransportSession.Mode == SessionMode.Client)
await _tlsStream.AuthenticateAsClientAsync("tlstest"); // TODO: Change!
{
await _tlsStream.AuthenticateAsClientAsync(new SslClientAuthenticationOptions
{
TargetHost = "tlstest" // TODO: Change!
},
_cancellationTokenSource.Token);
}
else
await _tlsStream.AuthenticateAsServerAsync(_config.Certificate);
{
await _tlsStream.AuthenticateAsServerAsync(new SslServerAuthenticationOptions()
{
ServerCertificate = _config.Certificate
}, _cancellationTokenSource.Token);
}
}
catch (Exception e)
{
_config.Logger.Warn($"{Mode} TlsApplication authentication failure. {e}");
}
// Allow reading of data now that the authentication process has completed.
_authSemaphore.Release();
if (!_tlsStream.IsAuthenticated)
{
Disconnect();
return;
}
_config.Logger?.Trace($"{Mode} TlsApplication authentication: {_tlsStream.IsAuthenticated}");
// Only pass on the ready status if the authentication is successful.
if(_tlsStream.IsAuthenticated)
base.OnTransportSessionReady(this, new SessionEventArgs(this));
try
{
var len = 0;
while ((len = await _tlsStream.ReadAsync(_tlsReadBuffer.Memory, _cancellationTokenSource.Token)) != 0)
{
base.OnSessionReceive(_tlsReadBuffer.Memory.Slice(0, len));
}
}
catch (OperationCanceledException)
{
// Ignored
}
catch (Exception e)
{
_config.Logger?.Error($"{Mode} TlsApplication read exception: {e}");
}
}
protected override void OnTransportSessionDisconnected(object sender, SessionEventArgs e)
{
_authSemaphore?.Release();
_cancellationTokenSource.Cancel();
base.OnTransportSessionDisconnected(sender, e);
}
protected override async void OnSessionReceive(ReadOnlyMemory<byte> buffer)
protected override void OnSessionReceive(ReadOnlyMemory<byte> buffer)
{
_config.Logger?.Trace($"{Mode} TlsApplication read {buffer.Length} encrypted bytes.");
var read = _innerStream.AsyncReadReceived(buffer, _cancellationTokenSource.Token);
_innerStream.AsyncReadReceived(buffer, _authSemaphoreCancellationTokenSource.Token);
// If there is not a wait on the inner stream pending, this will mean that the reads are complete
// and the authentication process is in progress. We need to wait for this process to complete
// before we can read data.
// TODO: Tests for deadlocks.
if (_authSemaphore != null
&& !_tlsStream.IsAuthenticated
&& !_innerStream.IsReadWaiting)
try
{
_config.Logger?.Trace($"{Mode} TlsApplication authentication in process of completing. Semaphore waiting...");
try
{
_authSemaphore.Wait(_authSemaphoreCancellationTokenSource.Token);
}
catch (OperationCanceledException)
{
_config.Logger?.Trace($"{Mode} TlsApplication authentication timed out.");
Disconnect();
}
_authSemaphore.Dispose();
_authSemaphore = null;
_config.Logger?.Trace($"{Mode} TlsApplication authentication semaphore released.");
// Block further reads until the entire buffer has been read.
read.Wait(_cancellationTokenSource.Token);
}
catch (OperationCanceledException)
{
// Ignored
}
catch (Exception e)
{
_config.Logger?.Trace($"{Mode} TlsApplication exception occured while receiving. {e}");
throw;
}
// If we are not authenticated, then this data should be relegated to the auth process only.
if (!_tlsStream.IsAuthenticated)
{
_config.Logger?.Trace($"{Mode} TlsApplication not authenticated.");
return;
}
// This can block...
var read = await _tlsStream.ReadAsync(_tlsReadBuffer.Memory);
_config.Logger?.Trace($"{Mode} TlsApplication read {read} clear bytes.");
if (read > 0)
base.OnSessionReceive(_tlsReadBuffer.Memory.Slice(0, read));
}
public override void Send(ReadOnlyMemory<byte> buffer, bool flush)
{
if (buffer.Length > _bufferSize)
{
_config.Logger?.Error(
$"{Mode} TlsApplication Sending {buffer.Length} bytes exceeds the SendAndReceiveBufferSize[{_bufferSize}].");
throw new Exception(
$"{Mode} TlsApplication Sending {buffer.Length} bytes exceeds the SendAndReceiveBufferSize[{_bufferSize}].");
}
_tlsStream.Write(buffer.Span);
if(flush)
_tlsStream.Flush();
if (LastSendException != null)
throw LastSendException;
}

Просмотреть файл

@ -1,60 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace DtronixMessageQueue.Layers.Application.Tls
{
public sealed class TlsAuthScheduler : TaskScheduler, IDisposable
{
private readonly BlockingCollection<Task> _tasksCollection = new BlockingCollection<Task>();
private readonly Thread _mainThread;
public TlsAuthScheduler()
{
_mainThread = new Thread(Execute);
if (!_mainThread.IsAlive)
{
_mainThread.Start();
}
}
private void Execute()
{
foreach (var task in _tasksCollection.GetConsumingEnumerable())
TryExecuteTask(task);
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasksCollection.ToArray();
}
protected override void QueueTask(Task task)
{
_tasksCollection.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
private void Dispose(bool disposing)
{
if (!disposing) return;
_tasksCollection.CompleteAdding();
_tasksCollection.Dispose();
_mainThread.Abort();
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}
}

Просмотреть файл

@ -53,12 +53,12 @@ namespace DtronixMessageQueue.Layers.Application.Tls
/// </summary>
/// <param name="buffer"></param>
/// <param name="cancellationToken"></param>
public void AsyncReadReceived(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
public async Task AsyncReadReceived(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
if(!AsyncMode)
throw new InvalidOperationException("Stream in synchronous mode. Can not use async methods.");
_receiveSemaphore.Wait(cancellationToken);
await _receiveSemaphore.WaitAsync(cancellationToken);
_received = buffer;
_receivePosition = 0;

Просмотреть файл

@ -0,0 +1,88 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace DtronixMessageQueue.Layers.Application.Tls
{
public sealed class TlsTaskScheduler : TaskScheduler, IDisposable
{
private class TaskThread
{
public CancellationTokenSource CancellationTokenSource;
public Thread Thread;
}
private readonly BlockingCollection<Task> _tasksCollection = new BlockingCollection<Task>();
private readonly List<TaskThread> _threads = new List<TaskThread>();
public TlsTaskScheduler(TlsApplicationConfig config)
{
var threads = config.TlsSchedulerThreads;
if (threads == -1)
threads = Environment.ProcessorCount;
for (int i = 0; i < threads; i++)
{
var thread = new Thread(Execute)
{
Name = $"TlsTaskScheduler-{i}"
};
var taskThread = new TaskThread
{
Thread = thread,
CancellationTokenSource = new CancellationTokenSource()
};
_threads.Add(taskThread);
thread.Start(taskThread);
}
}
private void Execute(object thread)
{
var taskThread = (TaskThread) thread;
foreach (var task in _tasksCollection.GetConsumingEnumerable(taskThread.CancellationTokenSource.Token))
TryExecuteTask(task);
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasksCollection.ToArray();
}
protected override void QueueTask(Task task)
{
_tasksCollection.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
private void Dispose(bool disposing)
{
if (!disposing) return;
_tasksCollection.CompleteAdding();
_tasksCollection.Dispose();
foreach (var thread in _threads)
{
thread.CancellationTokenSource.Cancel();
thread.Thread.Abort();
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}
}

Просмотреть файл

@ -38,7 +38,7 @@ namespace DtronixMessageQueue.Layers.Transports.Tcp
/// <summary>
/// Reset event used to ensure only one MqWorker can write to the socket at a time.
/// </summary>
private readonly SemaphoreSlim _writeSemaphore;
private readonly SemaphoreSlim _sendSemaphore;
public TcpTransportSession(
Socket socket,
@ -48,7 +48,7 @@ namespace DtronixMessageQueue.Layers.Transports.Tcp
{
_socket = socket;
_config = config;
_writeSemaphore = new SemaphoreSlim(1, 1);
_sendSemaphore = new SemaphoreSlim(1, 1);
Mode = mode;
_sendArgs = new TcpTransportAsyncEventArgs(memoryPool);
@ -144,7 +144,7 @@ namespace DtronixMessageQueue.Layers.Transports.Tcp
}
_config.Logger?.Trace($"{Mode} Acquiring write semaphore...");
_writeSemaphore.Wait(-1);
_sendSemaphore.Wait(-1);
_config.Logger?.Trace($"{Mode} Acquired write semaphore.");
var remaining = _sendArgs.Write(buffer);
@ -198,7 +198,7 @@ namespace DtronixMessageQueue.Layers.Transports.Tcp
{
_sendArgs?.Free();
_receiveArgs?.Free();
_writeSemaphore?.Dispose();
_sendSemaphore?.Dispose();
State = SessionState.Closed;
_socket.Close(1000);
@ -258,7 +258,7 @@ namespace DtronixMessageQueue.Layers.Transports.Tcp
_sendArgs.ResetSend();
_config.Logger?.Trace($"{Mode} Sending {e.BytesTransferred} bytes complete. Releasing Semaphore...");
_writeSemaphore.Release(1);
_sendSemaphore.Release(1);
_config.Logger?.Trace($"{Mode} Released semaphore. Session Sent method called." + (Sent == null ? " No connected method." : ""));
Sent?.Invoke(this);