Seperate message pump into higher layer.

This commit is contained in:
Chris Ross 2014-03-07 12:19:36 -08:00
Родитель 8ac4bbd6cf
Коммит 38f5793e3d
6 изменённых файлов: 230 добавлений и 184 удалений

Просмотреть файл

@ -40,8 +40,6 @@ namespace Microsoft.AspNet.Server.WebListener
private static readonly int BindingInfoSize =
Marshal.SizeOf<UnsafeNclNativeMethods.HttpApi.HTTP_BINDING_INFO>();
#endif
private static readonly int DefaultMaxAccepts = 5 * Environment.ProcessorCount;
private static readonly int DefaultMaxRequests = Int32.MaxValue;
// Win8# 559317 fixed a bug in Http.sys's HttpReceiveClientCertificate method.
// Without this fix IOCP callbacks were not being called although ERROR_IO_PENDING was
@ -60,8 +58,6 @@ namespace Microsoft.AspNet.Server.WebListener
private readonly ConcurrentDictionary<ulong, ConnectionCancellation> _connectionCancellationTokens;
private AppFunc _appFunc;
private IDictionary<string, object> _capabilities;
private LoggerFunc _logger;
private SafeHandle _requestQueueHandle;
@ -77,11 +73,6 @@ namespace Microsoft.AspNet.Server.WebListener
private List<Prefix> _uriPrefixes = new List<Prefix>();
private PumpLimits _pumpLimits;
private int _acceptorCounts;
private Action<object> _processRequest;
private readonly AwaitableThrottle _requestProcessingThrottle;
// The native request queue
private long? _requestQueueLength;
@ -101,11 +92,6 @@ namespace Microsoft.AspNet.Server.WebListener
_timeoutManager = new TimeoutManager(this);
_authManager = new AuthenticationManager(this);
_connectionCancellationTokens = new ConcurrentDictionary<ulong, ConnectionCancellation>();
_processRequest = new Action<object>(ProcessRequestAsync);
_pumpLimits = new PumpLimits(DefaultMaxAccepts, DefaultMaxRequests);
_requestProcessingThrottle = new AwaitableThrottle(DefaultMaxRequests);
}
internal enum State
@ -125,11 +111,6 @@ namespace Microsoft.AspNet.Server.WebListener
get { return _uriPrefixes; }
}
internal IDictionary<string, object> Capabilities
{
get { return _capabilities; }
}
internal SafeHandle RequestQueueHandle
{
get
@ -190,46 +171,6 @@ namespace Microsoft.AspNet.Server.WebListener
}
}
/// <summary>
/// These are merged as one operation because they should be swapped out atomically.
/// This controls how many requests the server attempts to process concurrently.
/// </summary>
/// <param name="maxAccepts">The maximum number of pending accepts.</param>
/// <param name="maxRequests">The maximum number of outstanding requests.</param>
public void SetRequestProcessingLimits(int maxAccepts, int maxRequests)
{
_pumpLimits = new PumpLimits(maxAccepts, maxRequests);
if (_state == State.Started)
{
ActivateRequestProcessingLimits();
}
}
private void ActivateRequestProcessingLimits()
{
_requestProcessingThrottle.MaxConcurrent = _pumpLimits.MaxOutstandingRequests;
for (int i = _acceptorCounts; i < _pumpLimits.MaxOutstandingAccepts; i++)
{
ProcessRequestsWorker();
}
}
/// <summary>
/// Gets the request processing limits.
/// </summary>
/// <param name="maxAccepts">The maximum number of pending accepts.</param>
/// <param name="maxRequests">The maximum number of outstanding requests.</param>
[SuppressMessage("Microsoft.Design", "CA1021:AvoidOutParameters", MessageId = "0#", Justification = "By design")]
[SuppressMessage("Microsoft.Design", "CA1021:AvoidOutParameters", MessageId = "1#", Justification = "By design")]
public void GetRequestProcessingLimits(out int maxAccepts, out int maxRequests)
{
PumpLimits limits = _pumpLimits;
maxAccepts = limits.MaxOutstandingAccepts;
maxRequests = limits.MaxOutstandingRequests;
}
/// <summary>
/// Sets the maximum number of requests that will be queued up in Http.Sys.
/// </summary>
@ -392,33 +333,13 @@ namespace Microsoft.AspNet.Server.WebListener
}
}
internal void Start(AppFunc app, IList<IDictionary<string, object>> addresses, IDictionary<string, object> capabilities, LoggerFactoryFunc loggerFactory)
internal void Start()
{
CheckDisposed();
// Can't call Start twice
Contract.Assert(_appFunc == null);
Contract.Assert(app != null);
Contract.Assert(addresses != null);
Contract.Assert(capabilities != null);
_appFunc = app;
_capabilities = capabilities;
_logger = LogHelper.CreateLogger(loggerFactory, typeof(OwinWebListener));
// TODO: _logger = LogHelper.CreateLogger(loggerFactory, typeof(OwinWebListener));
LogHelper.LogInfo(_logger, "Start");
foreach (var address in addresses)
{
// Build addresses from parts
var scheme = address.Get<string>("scheme") ?? Constants.HttpScheme;
var host = address.Get<string>("host") ?? "localhost";
var port = address.Get<string>("port") ?? "5000";
var path = address.Get<string>("path") ?? string.Empty;
Prefix prefix = Prefix.Create(scheme, host, port, path);
_uriPrefixes.Add(prefix);
}
// Make sure there are no race conditions between Start/Stop/Abort/Close/Dispose and
// calls to SetupV2Config: Start needs to setup all resources (esp. in V2 where besides
// the request handle, there is also a server session and a Url group. Abort/Stop must
@ -456,8 +377,6 @@ namespace Microsoft.AspNet.Server.WebListener
_state = State.Started;
SetRequestQueueLimit();
ActivateRequestProcessingLimits();
}
catch (Exception exception)
{
@ -471,84 +390,6 @@ namespace Microsoft.AspNet.Server.WebListener
}
}
// The message pump.
// When we start listening for the next request on one thread, we may need to be sure that the
// completion continues on another thread as to not block the current request processing.
// The awaits will manage stack depth for us.
private async void ProcessRequestsWorker()
{
int workerIndex = Interlocked.Increment(ref _acceptorCounts);
while (IsListening && workerIndex <= _pumpLimits.MaxOutstandingAccepts)
{
await _requestProcessingThrottle;
// Receive a request
RequestContext requestContext;
try
{
requestContext = await GetContextAsync().SupressContext();
}
catch (Exception exception)
{
LogHelper.LogException(_logger, "ListenForNextRequestAsync", exception);
Contract.Assert(!IsListening);
return;
}
try
{
Task.Factory.StartNew(_processRequest, requestContext);
}
catch (Exception ex)
{
// Request processing failed to be queued in threadpool
// Log the error message, release throttle and move on
LogHelper.LogException(_logger, "ProcessRequestAsync", ex);
_requestProcessingThrottle.Release();
}
}
Interlocked.Decrement(ref _acceptorCounts);
}
private async void ProcessRequestAsync(object requestContextObj)
{
var requestContext = requestContextObj as RequestContext;
try
{
try
{
// TODO: Make disconnect registration lazy
RegisterForDisconnectNotification(requestContext);
FeatureContext featureContext = new FeatureContext(requestContext);
await _appFunc(featureContext.Features).SupressContext();
await requestContext.ProcessResponseAsync().SupressContext();
}
catch (Exception ex)
{
LogHelper.LogException(_logger, "ProcessRequestAsync", ex);
if (requestContext.Response.SentHeaders)
{
requestContext.Abort();
}
else
{
// We haven't sent a response yet, try to send a 500 Internal Server Error
requestContext.SetFatalResponse();
}
}
requestContext.Dispose();
}
catch (Exception ex)
{
LogHelper.LogException(_logger, "ProcessRequestAsync", ex);
requestContext.Abort();
requestContext.Dispose();
}
finally
{
_requestProcessingThrottle.Release();
}
}
private void CleanupV2Config()
{
// If we never setup V2, just return.
@ -829,20 +670,18 @@ namespace Microsoft.AspNet.Server.WebListener
return asyncResult.Task;
}
private void RegisterForDisconnectNotification(RequestContext requestContext)
internal CancellationToken RegisterForDisconnectNotification(RequestContext requestContext)
{
try
{
// Create exactly one CancellationToken per connection.
ulong connectionId = requestContext.Request.ConnectionId;
CancellationToken ct = GetConnectionCancellation(connectionId);
requestContext.Request.RegisterForDisconnect(ct);
// TODO: Need a feature equivalent for owin.CallCancelled.
// requestContext.Environment.ConnectionDisconnect = ct;
return GetConnectionCancellation(connectionId);
}
catch (Win32Exception exception)
{
LogHelper.LogException(_logger, "RegisterForDisconnectNotification", exception);
return CancellationToken.None;
}
}

Просмотреть файл

@ -56,7 +56,6 @@ namespace Microsoft.AspNet.Server.WebListener
private IPrincipal _user;
private bool _isDisposed = false;
private CancellationTokenRegistration _disconnectRegistration;
internal unsafe Request(RequestContext httpContext, NativeRequestContext memoryBlob)
{
@ -511,7 +510,6 @@ namespace Microsoft.AspNet.Server.WebListener
memoryBlob.Dispose();
_nativeRequestContext = null;
}
_disconnectRegistration.Dispose();
if (_nativeStream != null)
{
_nativeStream.Dispose();
@ -533,17 +531,5 @@ namespace Microsoft.AspNet.Server.WebListener
_nativeStream = new RequestStream(RequestContext);
}
}
internal void RegisterForDisconnect(CancellationToken cancellationToken)
{
_disconnectRegistration = cancellationToken.Register(Cancel, this);
}
private static void Cancel(object obj)
{
Request request = (Request)obj;
// Cancels owin.CallCanceled
request.RequestContext.Abort();
}
}
}

Просмотреть файл

@ -28,6 +28,8 @@ namespace Microsoft.AspNet.Server.WebListener
private NativeRequestContext _memoryBlob;
private OpaqueFunc _opaqueCallback;
private bool _disposed;
private CancellationTokenRegistration? _disconnectRegistration;
private CancellationToken? _disconnectToken;
internal RequestContext(OwinWebListener httpListener, NativeRequestContext memoryBlob)
{
@ -56,6 +58,28 @@ namespace Microsoft.AspNet.Server.WebListener
}
}
internal CancellationToken DisconnectToken
{
get
{
if (!_disconnectToken.HasValue)
{
_disconnectToken = _server.RegisterForDisconnectNotification(this);
if (_disconnectToken.Value.CanBeCanceled)
{
_disconnectRegistration = _disconnectToken.Value.Register(Cancel, this);
}
}
return _disconnectToken.Value;
}
}
private static void Cancel(object obj)
{
RequestContext context = (RequestContext)obj;
context.Abort();
}
internal OwinWebListener Server
{
get
@ -113,6 +137,10 @@ namespace Microsoft.AspNet.Server.WebListener
// TODO: Verbose log
try
{
if (_disconnectRegistration.HasValue)
{
_disconnectRegistration.Value.Dispose();
}
_response.Dispose();
}
finally

Просмотреть файл

@ -117,7 +117,7 @@ namespace Microsoft.AspNet.Server.WebListener
_fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, bufferSize,
FileOptions.Asynchronous | FileOptions.SequentialScan); // Extremely expensive.
#else
_fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, bufferSize, useAsync: true); // Extremely expensive.
_fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, bufferSize /*, useAsync: true*/); // Extremely expensive.
#endif
#if !NET45
throw new NotImplementedException();

Просмотреть файл

@ -74,10 +74,11 @@ namespace Microsoft.AspNet.Server.WebListener
OwinWebListener server = (OwinWebListener)serverConfig.AdvancedConfiguration;
var capabilities = new Dictionary<string, object>();
// TODO: var capabilities = new Dictionary<string, object>();
WebListenerWrapper wrapper = new WebListenerWrapper(server);
server.Start(app, serverConfig.Addresses, capabilities, _loggerFactory);
return server;
wrapper.Start(app, serverConfig.Addresses, _loggerFactory);
return wrapper;
}
}
}

Просмотреть файл

@ -0,0 +1,192 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Contracts;
using System.Diagnostics;
using System.Threading.Tasks;
namespace Microsoft.AspNet.Server.WebListener
{
using AppFunc = Func<object, Task>;
using LoggerFactoryFunc = Func<string, Func<TraceEventType, int, object, Exception, Func<object, Exception, string>, bool>>;
using LoggerFunc = Func<TraceEventType, int, object, Exception, Func<object, Exception, string>, bool>;
using System.Threading;
public class WebListenerWrapper : IDisposable
{
private static readonly int DefaultMaxAccepts = 5 * Environment.ProcessorCount;
private static readonly int DefaultMaxRequests = Int32.MaxValue;
private OwinWebListener _listener;
private AppFunc _appFunc;
private LoggerFunc _logger;
private PumpLimits _pumpLimits;
private int _acceptorCounts;
private Action<object> _processRequest;
private readonly AwaitableThrottle _requestProcessingThrottle;
// TODO: private IDictionary<string, object> _capabilities;
internal WebListenerWrapper(OwinWebListener listener)
{
Contract.Assert(listener != null);
_listener = listener;
_processRequest = new Action<object>(ProcessRequestAsync);
_pumpLimits = new PumpLimits(DefaultMaxAccepts, DefaultMaxRequests);
_requestProcessingThrottle = new AwaitableThrottle(DefaultMaxRequests);
}
internal void Start(AppFunc app, IList<IDictionary<string, object>> addresses, LoggerFactoryFunc loggerFactory)
{
// Can't call Start twice
Contract.Assert(_appFunc == null);
Contract.Assert(app != null);
Contract.Assert(addresses != null);
_appFunc = app;
_logger = LogHelper.CreateLogger(loggerFactory, typeof(WebListenerWrapper));
LogHelper.LogInfo(_logger, "Start");
foreach (var address in addresses)
{
// Build addresses from parts
var scheme = address.Get<string>("scheme") ?? Constants.HttpScheme;
var host = address.Get<string>("host") ?? "localhost";
var port = address.Get<string>("port") ?? "5000";
var path = address.Get<string>("path") ?? string.Empty;
Prefix prefix = Prefix.Create(scheme, host, port, path);
_listener.UriPrefixes.Add(prefix);
}
_listener.Start();
ActivateRequestProcessingLimits();
}
/// <summary>
/// These are merged as one operation because they should be swapped out atomically.
/// This controls how many requests the server attempts to process concurrently.
/// </summary>
/// <param name="maxAccepts">The maximum number of pending accepts.</param>
/// <param name="maxRequests">The maximum number of outstanding requests.</param>
public void SetRequestProcessingLimits(int maxAccepts, int maxRequests)
{
_pumpLimits = new PumpLimits(maxAccepts, maxRequests);
if (_listener.IsListening)
{
ActivateRequestProcessingLimits();
}
}
private void ActivateRequestProcessingLimits()
{
_requestProcessingThrottle.MaxConcurrent = _pumpLimits.MaxOutstandingRequests;
for (int i = _acceptorCounts; i < _pumpLimits.MaxOutstandingAccepts; i++)
{
ProcessRequestsWorker();
}
}
/// <summary>
/// Gets the request processing limits.
/// </summary>
/// <param name="maxAccepts">The maximum number of pending accepts.</param>
/// <param name="maxRequests">The maximum number of outstanding requests.</param>
[SuppressMessage("Microsoft.Design", "CA1021:AvoidOutParameters", MessageId = "0#", Justification = "By design")]
[SuppressMessage("Microsoft.Design", "CA1021:AvoidOutParameters", MessageId = "1#", Justification = "By design")]
public void GetRequestProcessingLimits(out int maxAccepts, out int maxRequests)
{
PumpLimits limits = _pumpLimits;
maxAccepts = limits.MaxOutstandingAccepts;
maxRequests = limits.MaxOutstandingRequests;
}
// The message pump.
// When we start listening for the next request on one thread, we may need to be sure that the
// completion continues on another thread as to not block the current request processing.
// The awaits will manage stack depth for us.
private async void ProcessRequestsWorker()
{
int workerIndex = Interlocked.Increment(ref _acceptorCounts);
while (_listener.IsListening && workerIndex <= _pumpLimits.MaxOutstandingAccepts)
{
await _requestProcessingThrottle;
// Receive a request
RequestContext requestContext;
try
{
requestContext = await _listener.GetContextAsync().SupressContext();
}
catch (Exception exception)
{
LogHelper.LogException(_logger, "ListenForNextRequestAsync", exception);
Contract.Assert(!_listener.IsListening);
return;
}
try
{
Task.Factory.StartNew(_processRequest, requestContext);
}
catch (Exception ex)
{
// Request processing failed to be queued in threadpool
// Log the error message, release throttle and move on
LogHelper.LogException(_logger, "ProcessRequestAsync", ex);
_requestProcessingThrottle.Release();
}
}
Interlocked.Decrement(ref _acceptorCounts);
}
private async void ProcessRequestAsync(object requestContextObj)
{
var requestContext = requestContextObj as RequestContext;
try
{
try
{
// TODO: Make disconnect registration lazy
FeatureContext featureContext = new FeatureContext(requestContext);
await _appFunc(featureContext.Features).SupressContext();
await requestContext.ProcessResponseAsync().SupressContext();
}
catch (Exception ex)
{
LogHelper.LogException(_logger, "ProcessRequestAsync", ex);
if (requestContext.Response.SentHeaders)
{
requestContext.Abort();
}
else
{
// We haven't sent a response yet, try to send a 500 Internal Server Error
requestContext.SetFatalResponse();
}
}
requestContext.Dispose();
}
catch (Exception ex)
{
LogHelper.LogException(_logger, "ProcessRequestAsync", ex);
requestContext.Abort();
requestContext.Dispose();
}
finally
{
_requestProcessingThrottle.Release();
}
}
public void Dispose()
{
_listener.Dispose();
}
}
}