diff --git a/src/Microsoft.AspNet.Server.WebListener/OwinWebListener.cs b/src/Microsoft.AspNet.Server.WebListener/OwinWebListener.cs index 460c7d9..dbe4fe4 100644 --- a/src/Microsoft.AspNet.Server.WebListener/OwinWebListener.cs +++ b/src/Microsoft.AspNet.Server.WebListener/OwinWebListener.cs @@ -40,8 +40,6 @@ namespace Microsoft.AspNet.Server.WebListener private static readonly int BindingInfoSize = Marshal.SizeOf(); #endif - private static readonly int DefaultMaxAccepts = 5 * Environment.ProcessorCount; - private static readonly int DefaultMaxRequests = Int32.MaxValue; // Win8# 559317 fixed a bug in Http.sys's HttpReceiveClientCertificate method. // Without this fix IOCP callbacks were not being called although ERROR_IO_PENDING was @@ -60,8 +58,6 @@ namespace Microsoft.AspNet.Server.WebListener private readonly ConcurrentDictionary _connectionCancellationTokens; - private AppFunc _appFunc; - private IDictionary _capabilities; private LoggerFunc _logger; private SafeHandle _requestQueueHandle; @@ -77,11 +73,6 @@ namespace Microsoft.AspNet.Server.WebListener private List _uriPrefixes = new List(); - private PumpLimits _pumpLimits; - private int _acceptorCounts; - private Action _processRequest; - private readonly AwaitableThrottle _requestProcessingThrottle; - // The native request queue private long? _requestQueueLength; @@ -101,11 +92,6 @@ namespace Microsoft.AspNet.Server.WebListener _timeoutManager = new TimeoutManager(this); _authManager = new AuthenticationManager(this); _connectionCancellationTokens = new ConcurrentDictionary(); - - _processRequest = new Action(ProcessRequestAsync); - - _pumpLimits = new PumpLimits(DefaultMaxAccepts, DefaultMaxRequests); - _requestProcessingThrottle = new AwaitableThrottle(DefaultMaxRequests); } internal enum State @@ -125,11 +111,6 @@ namespace Microsoft.AspNet.Server.WebListener get { return _uriPrefixes; } } - internal IDictionary Capabilities - { - get { return _capabilities; } - } - internal SafeHandle RequestQueueHandle { get @@ -190,46 +171,6 @@ namespace Microsoft.AspNet.Server.WebListener } } - /// - /// These are merged as one operation because they should be swapped out atomically. - /// This controls how many requests the server attempts to process concurrently. - /// - /// The maximum number of pending accepts. - /// The maximum number of outstanding requests. - public void SetRequestProcessingLimits(int maxAccepts, int maxRequests) - { - _pumpLimits = new PumpLimits(maxAccepts, maxRequests); - - if (_state == State.Started) - { - ActivateRequestProcessingLimits(); - } - } - - private void ActivateRequestProcessingLimits() - { - _requestProcessingThrottle.MaxConcurrent = _pumpLimits.MaxOutstandingRequests; - - for (int i = _acceptorCounts; i < _pumpLimits.MaxOutstandingAccepts; i++) - { - ProcessRequestsWorker(); - } - } - - /// - /// Gets the request processing limits. - /// - /// The maximum number of pending accepts. - /// The maximum number of outstanding requests. - [SuppressMessage("Microsoft.Design", "CA1021:AvoidOutParameters", MessageId = "0#", Justification = "By design")] - [SuppressMessage("Microsoft.Design", "CA1021:AvoidOutParameters", MessageId = "1#", Justification = "By design")] - public void GetRequestProcessingLimits(out int maxAccepts, out int maxRequests) - { - PumpLimits limits = _pumpLimits; - maxAccepts = limits.MaxOutstandingAccepts; - maxRequests = limits.MaxOutstandingRequests; - } - /// /// Sets the maximum number of requests that will be queued up in Http.Sys. /// @@ -392,33 +333,13 @@ namespace Microsoft.AspNet.Server.WebListener } } - internal void Start(AppFunc app, IList> addresses, IDictionary capabilities, LoggerFactoryFunc loggerFactory) + internal void Start() { CheckDisposed(); - // Can't call Start twice - Contract.Assert(_appFunc == null); - Contract.Assert(app != null); - Contract.Assert(addresses != null); - Contract.Assert(capabilities != null); - - _appFunc = app; - _capabilities = capabilities; - _logger = LogHelper.CreateLogger(loggerFactory, typeof(OwinWebListener)); + // TODO: _logger = LogHelper.CreateLogger(loggerFactory, typeof(OwinWebListener)); LogHelper.LogInfo(_logger, "Start"); - foreach (var address in addresses) - { - // Build addresses from parts - var scheme = address.Get("scheme") ?? Constants.HttpScheme; - var host = address.Get("host") ?? "localhost"; - var port = address.Get("port") ?? "5000"; - var path = address.Get("path") ?? string.Empty; - - Prefix prefix = Prefix.Create(scheme, host, port, path); - _uriPrefixes.Add(prefix); - } - // Make sure there are no race conditions between Start/Stop/Abort/Close/Dispose and // calls to SetupV2Config: Start needs to setup all resources (esp. in V2 where besides // the request handle, there is also a server session and a Url group. Abort/Stop must @@ -456,8 +377,6 @@ namespace Microsoft.AspNet.Server.WebListener _state = State.Started; SetRequestQueueLimit(); - - ActivateRequestProcessingLimits(); } catch (Exception exception) { @@ -471,84 +390,6 @@ namespace Microsoft.AspNet.Server.WebListener } } - // The message pump. - // When we start listening for the next request on one thread, we may need to be sure that the - // completion continues on another thread as to not block the current request processing. - // The awaits will manage stack depth for us. - private async void ProcessRequestsWorker() - { - int workerIndex = Interlocked.Increment(ref _acceptorCounts); - while (IsListening && workerIndex <= _pumpLimits.MaxOutstandingAccepts) - { - await _requestProcessingThrottle; - - // Receive a request - RequestContext requestContext; - try - { - requestContext = await GetContextAsync().SupressContext(); - } - catch (Exception exception) - { - LogHelper.LogException(_logger, "ListenForNextRequestAsync", exception); - Contract.Assert(!IsListening); - return; - } - try - { - Task.Factory.StartNew(_processRequest, requestContext); - } - catch (Exception ex) - { - // Request processing failed to be queued in threadpool - // Log the error message, release throttle and move on - LogHelper.LogException(_logger, "ProcessRequestAsync", ex); - _requestProcessingThrottle.Release(); - } - } - Interlocked.Decrement(ref _acceptorCounts); - } - - private async void ProcessRequestAsync(object requestContextObj) - { - var requestContext = requestContextObj as RequestContext; - try - { - try - { - // TODO: Make disconnect registration lazy - RegisterForDisconnectNotification(requestContext); - FeatureContext featureContext = new FeatureContext(requestContext); - await _appFunc(featureContext.Features).SupressContext(); - await requestContext.ProcessResponseAsync().SupressContext(); - } - catch (Exception ex) - { - LogHelper.LogException(_logger, "ProcessRequestAsync", ex); - if (requestContext.Response.SentHeaders) - { - requestContext.Abort(); - } - else - { - // We haven't sent a response yet, try to send a 500 Internal Server Error - requestContext.SetFatalResponse(); - } - } - requestContext.Dispose(); - } - catch (Exception ex) - { - LogHelper.LogException(_logger, "ProcessRequestAsync", ex); - requestContext.Abort(); - requestContext.Dispose(); - } - finally - { - _requestProcessingThrottle.Release(); - } - } - private void CleanupV2Config() { // If we never setup V2, just return. @@ -829,20 +670,18 @@ namespace Microsoft.AspNet.Server.WebListener return asyncResult.Task; } - private void RegisterForDisconnectNotification(RequestContext requestContext) + internal CancellationToken RegisterForDisconnectNotification(RequestContext requestContext) { try { // Create exactly one CancellationToken per connection. ulong connectionId = requestContext.Request.ConnectionId; - CancellationToken ct = GetConnectionCancellation(connectionId); - requestContext.Request.RegisterForDisconnect(ct); - // TODO: Need a feature equivalent for owin.CallCancelled. - // requestContext.Environment.ConnectionDisconnect = ct; + return GetConnectionCancellation(connectionId); } catch (Win32Exception exception) { LogHelper.LogException(_logger, "RegisterForDisconnectNotification", exception); + return CancellationToken.None; } } diff --git a/src/Microsoft.AspNet.Server.WebListener/RequestProcessing/Request.cs b/src/Microsoft.AspNet.Server.WebListener/RequestProcessing/Request.cs index 0a3a186..78049af 100644 --- a/src/Microsoft.AspNet.Server.WebListener/RequestProcessing/Request.cs +++ b/src/Microsoft.AspNet.Server.WebListener/RequestProcessing/Request.cs @@ -56,7 +56,6 @@ namespace Microsoft.AspNet.Server.WebListener private IPrincipal _user; private bool _isDisposed = false; - private CancellationTokenRegistration _disconnectRegistration; internal unsafe Request(RequestContext httpContext, NativeRequestContext memoryBlob) { @@ -511,7 +510,6 @@ namespace Microsoft.AspNet.Server.WebListener memoryBlob.Dispose(); _nativeRequestContext = null; } - _disconnectRegistration.Dispose(); if (_nativeStream != null) { _nativeStream.Dispose(); @@ -533,17 +531,5 @@ namespace Microsoft.AspNet.Server.WebListener _nativeStream = new RequestStream(RequestContext); } } - - internal void RegisterForDisconnect(CancellationToken cancellationToken) - { - _disconnectRegistration = cancellationToken.Register(Cancel, this); - } - - private static void Cancel(object obj) - { - Request request = (Request)obj; - // Cancels owin.CallCanceled - request.RequestContext.Abort(); - } } } diff --git a/src/Microsoft.AspNet.Server.WebListener/RequestProcessing/RequestContext.cs b/src/Microsoft.AspNet.Server.WebListener/RequestProcessing/RequestContext.cs index 654772e..8c4b8d3 100644 --- a/src/Microsoft.AspNet.Server.WebListener/RequestProcessing/RequestContext.cs +++ b/src/Microsoft.AspNet.Server.WebListener/RequestProcessing/RequestContext.cs @@ -28,6 +28,8 @@ namespace Microsoft.AspNet.Server.WebListener private NativeRequestContext _memoryBlob; private OpaqueFunc _opaqueCallback; private bool _disposed; + private CancellationTokenRegistration? _disconnectRegistration; + private CancellationToken? _disconnectToken; internal RequestContext(OwinWebListener httpListener, NativeRequestContext memoryBlob) { @@ -56,6 +58,28 @@ namespace Microsoft.AspNet.Server.WebListener } } + internal CancellationToken DisconnectToken + { + get + { + if (!_disconnectToken.HasValue) + { + _disconnectToken = _server.RegisterForDisconnectNotification(this); + if (_disconnectToken.Value.CanBeCanceled) + { + _disconnectRegistration = _disconnectToken.Value.Register(Cancel, this); + } + } + return _disconnectToken.Value; + } + } + + private static void Cancel(object obj) + { + RequestContext context = (RequestContext)obj; + context.Abort(); + } + internal OwinWebListener Server { get @@ -113,6 +137,10 @@ namespace Microsoft.AspNet.Server.WebListener // TODO: Verbose log try { + if (_disconnectRegistration.HasValue) + { + _disconnectRegistration.Value.Dispose(); + } _response.Dispose(); } finally diff --git a/src/Microsoft.AspNet.Server.WebListener/RequestProcessing/ResponseStreamAsyncResult.cs b/src/Microsoft.AspNet.Server.WebListener/RequestProcessing/ResponseStreamAsyncResult.cs index ca38ba6..42b8122 100644 --- a/src/Microsoft.AspNet.Server.WebListener/RequestProcessing/ResponseStreamAsyncResult.cs +++ b/src/Microsoft.AspNet.Server.WebListener/RequestProcessing/ResponseStreamAsyncResult.cs @@ -117,7 +117,7 @@ namespace Microsoft.AspNet.Server.WebListener _fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, bufferSize, FileOptions.Asynchronous | FileOptions.SequentialScan); // Extremely expensive. #else - _fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, bufferSize, useAsync: true); // Extremely expensive. + _fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, bufferSize /*, useAsync: true*/); // Extremely expensive. #endif #if !NET45 throw new NotImplementedException(); diff --git a/src/Microsoft.AspNet.Server.WebListener/ServerFactory.cs b/src/Microsoft.AspNet.Server.WebListener/ServerFactory.cs index 9a3fa32..fc06747 100644 --- a/src/Microsoft.AspNet.Server.WebListener/ServerFactory.cs +++ b/src/Microsoft.AspNet.Server.WebListener/ServerFactory.cs @@ -74,10 +74,11 @@ namespace Microsoft.AspNet.Server.WebListener OwinWebListener server = (OwinWebListener)serverConfig.AdvancedConfiguration; - var capabilities = new Dictionary(); + // TODO: var capabilities = new Dictionary(); + WebListenerWrapper wrapper = new WebListenerWrapper(server); - server.Start(app, serverConfig.Addresses, capabilities, _loggerFactory); - return server; + wrapper.Start(app, serverConfig.Addresses, _loggerFactory); + return wrapper; } } } diff --git a/src/Microsoft.AspNet.Server.WebListener/WebListenerWrapper.cs b/src/Microsoft.AspNet.Server.WebListener/WebListenerWrapper.cs new file mode 100644 index 0000000..3cd482c --- /dev/null +++ b/src/Microsoft.AspNet.Server.WebListener/WebListenerWrapper.cs @@ -0,0 +1,192 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Diagnostics.Contracts; +using System.Diagnostics; +using System.Threading.Tasks; + +namespace Microsoft.AspNet.Server.WebListener +{ + using AppFunc = Func; + using LoggerFactoryFunc = Func, bool>>; + using LoggerFunc = Func, bool>; + using System.Threading; + + public class WebListenerWrapper : IDisposable + { + private static readonly int DefaultMaxAccepts = 5 * Environment.ProcessorCount; + private static readonly int DefaultMaxRequests = Int32.MaxValue; + + private OwinWebListener _listener; + private AppFunc _appFunc; + private LoggerFunc _logger; + + private PumpLimits _pumpLimits; + private int _acceptorCounts; + private Action _processRequest; + private readonly AwaitableThrottle _requestProcessingThrottle; + + // TODO: private IDictionary _capabilities; + + internal WebListenerWrapper(OwinWebListener listener) + { + Contract.Assert(listener != null); + _listener = listener; + + _processRequest = new Action(ProcessRequestAsync); + _pumpLimits = new PumpLimits(DefaultMaxAccepts, DefaultMaxRequests); + _requestProcessingThrottle = new AwaitableThrottle(DefaultMaxRequests); + } + + internal void Start(AppFunc app, IList> addresses, LoggerFactoryFunc loggerFactory) + { + // Can't call Start twice + Contract.Assert(_appFunc == null); + + Contract.Assert(app != null); + Contract.Assert(addresses != null); + + _appFunc = app; + _logger = LogHelper.CreateLogger(loggerFactory, typeof(WebListenerWrapper)); + LogHelper.LogInfo(_logger, "Start"); + + foreach (var address in addresses) + { + // Build addresses from parts + var scheme = address.Get("scheme") ?? Constants.HttpScheme; + var host = address.Get("host") ?? "localhost"; + var port = address.Get("port") ?? "5000"; + var path = address.Get("path") ?? string.Empty; + + Prefix prefix = Prefix.Create(scheme, host, port, path); + _listener.UriPrefixes.Add(prefix); + } + + _listener.Start(); + + ActivateRequestProcessingLimits(); + } + + /// + /// These are merged as one operation because they should be swapped out atomically. + /// This controls how many requests the server attempts to process concurrently. + /// + /// The maximum number of pending accepts. + /// The maximum number of outstanding requests. + public void SetRequestProcessingLimits(int maxAccepts, int maxRequests) + { + _pumpLimits = new PumpLimits(maxAccepts, maxRequests); + + if (_listener.IsListening) + { + ActivateRequestProcessingLimits(); + } + } + + private void ActivateRequestProcessingLimits() + { + _requestProcessingThrottle.MaxConcurrent = _pumpLimits.MaxOutstandingRequests; + + for (int i = _acceptorCounts; i < _pumpLimits.MaxOutstandingAccepts; i++) + { + ProcessRequestsWorker(); + } + } + + /// + /// Gets the request processing limits. + /// + /// The maximum number of pending accepts. + /// The maximum number of outstanding requests. + [SuppressMessage("Microsoft.Design", "CA1021:AvoidOutParameters", MessageId = "0#", Justification = "By design")] + [SuppressMessage("Microsoft.Design", "CA1021:AvoidOutParameters", MessageId = "1#", Justification = "By design")] + public void GetRequestProcessingLimits(out int maxAccepts, out int maxRequests) + { + PumpLimits limits = _pumpLimits; + maxAccepts = limits.MaxOutstandingAccepts; + maxRequests = limits.MaxOutstandingRequests; + } + + // The message pump. + // When we start listening for the next request on one thread, we may need to be sure that the + // completion continues on another thread as to not block the current request processing. + // The awaits will manage stack depth for us. + private async void ProcessRequestsWorker() + { + int workerIndex = Interlocked.Increment(ref _acceptorCounts); + while (_listener.IsListening && workerIndex <= _pumpLimits.MaxOutstandingAccepts) + { + await _requestProcessingThrottle; + + // Receive a request + RequestContext requestContext; + try + { + requestContext = await _listener.GetContextAsync().SupressContext(); + } + catch (Exception exception) + { + LogHelper.LogException(_logger, "ListenForNextRequestAsync", exception); + Contract.Assert(!_listener.IsListening); + return; + } + try + { + Task.Factory.StartNew(_processRequest, requestContext); + } + catch (Exception ex) + { + // Request processing failed to be queued in threadpool + // Log the error message, release throttle and move on + LogHelper.LogException(_logger, "ProcessRequestAsync", ex); + _requestProcessingThrottle.Release(); + } + } + Interlocked.Decrement(ref _acceptorCounts); + } + + private async void ProcessRequestAsync(object requestContextObj) + { + var requestContext = requestContextObj as RequestContext; + try + { + try + { + // TODO: Make disconnect registration lazy + FeatureContext featureContext = new FeatureContext(requestContext); + await _appFunc(featureContext.Features).SupressContext(); + await requestContext.ProcessResponseAsync().SupressContext(); + } + catch (Exception ex) + { + LogHelper.LogException(_logger, "ProcessRequestAsync", ex); + if (requestContext.Response.SentHeaders) + { + requestContext.Abort(); + } + else + { + // We haven't sent a response yet, try to send a 500 Internal Server Error + requestContext.SetFatalResponse(); + } + } + requestContext.Dispose(); + } + catch (Exception ex) + { + LogHelper.LogException(_logger, "ProcessRequestAsync", ex); + requestContext.Abort(); + requestContext.Dispose(); + } + finally + { + _requestProcessingThrottle.Release(); + } + } + + public void Dispose() + { + _listener.Dispose(); + } + } +}