#108 Make flush work for response compression

This commit is contained in:
Chris R 2016-10-04 13:10:06 -07:00
Родитель 7be02fa146
Коммит 3cb0fc640a
9 изменённых файлов: 458 добавлений и 28 удалений

Просмотреть файл

@ -7,6 +7,8 @@ namespace ResponseCompressionSample
{
public string EncodingName => "custom";
public bool SupportsFlush => true;
public Stream CreateStream(Stream outputStream)
{
// Create a custom compression stream wrapper here

Просмотреть файл

@ -1,11 +1,15 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.ResponseCompression;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace ResponseCompressionSample
{
@ -22,6 +26,23 @@ namespace ResponseCompressionSample
{
app.UseResponseCompression();
app.Map("/trickle", trickleApp =>
{
trickleApp.Run(async context =>
{
context.Response.ContentType = "text/plain";
// Disables compression on net451 because that GZipStream does not implement Flush.
context.Features.Get<IHttpBufferingFeature>()?.DisableResponseBuffering();
for (int i = 0; i < 100; i++)
{
await context.Response.WriteAsync("a");
await context.Response.Body.FlushAsync();
await Task.Delay(TimeSpan.FromSeconds(1));
}
});
});
app.Run(async context =>
{
context.Response.ContentType = "text/plain";
@ -32,7 +53,14 @@ namespace ResponseCompressionSample
public static void Main(string[] args)
{
var host = new WebHostBuilder()
.UseKestrel()
.UseKestrel(options =>
{
options.UseConnectionLogging();
})
.ConfigureLogging(factory =>
{
factory.AddConsole(LogLevel.Debug);
})
.UseStartup<Startup>()
.Build();

Просмотреть файл

@ -1,7 +1,8 @@
{
"dependencies": {
"Microsoft.AspNetCore.ResponseCompression": "0.1.0-*",
"Microsoft.AspNetCore.Server.Kestrel": "1.1.0-*"
"Microsoft.AspNetCore.Server.Kestrel": "1.1.0-*",
"Microsoft.Extensions.Logging.Console": "1.1.0-*"
},
"buildOptions": {
"emitEntryPoint": true

Просмотреть файл

@ -6,6 +6,7 @@ using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Net.Http.Headers;
namespace Microsoft.AspNetCore.ResponseCompression
@ -13,7 +14,7 @@ namespace Microsoft.AspNetCore.ResponseCompression
/// <summary>
/// Stream wrapper that create specific compression stream only if necessary.
/// </summary>
internal class BodyWrapperStream : Stream
internal class BodyWrapperStream : Stream, IHttpBufferingFeature
{
private readonly HttpResponse _response;
@ -23,16 +24,20 @@ namespace Microsoft.AspNetCore.ResponseCompression
private readonly ICompressionProvider _compressionProvider;
private readonly IHttpBufferingFeature _innerBufferFeature;
private bool _compressionChecked = false;
private Stream _compressionStream = null;
internal BodyWrapperStream(HttpResponse response, Stream bodyOriginalStream, IResponseCompressionProvider provider, ICompressionProvider compressionProvider)
internal BodyWrapperStream(HttpResponse response, Stream bodyOriginalStream, IResponseCompressionProvider provider, ICompressionProvider compressionProvider,
IHttpBufferingFeature innerBufferFeature)
{
_response = response;
_bodyOriginalStream = bodyOriginalStream;
_provider = provider;
_compressionProvider = compressionProvider;
_innerBufferFeature = innerBufferFeature;
}
protected override void Dispose(bool disposing)
@ -63,7 +68,14 @@ namespace Microsoft.AspNetCore.ResponseCompression
public override void Flush()
{
OnWrite();
if (!_compressionChecked)
{
OnWrite();
// Flush the original stream to send the headers. Flushing the compression stream won't
// flush the original stream if no data has been written yet.
_bodyOriginalStream.Flush();
return;
}
if (_compressionStream != null)
{
@ -77,12 +89,19 @@ namespace Microsoft.AspNetCore.ResponseCompression
public override Task FlushAsync(CancellationToken cancellationToken)
{
OnWrite();
if (!_compressionChecked)
{
OnWrite();
// Flush the original stream to send the headers. Flushing the compression stream won't
// flush the original stream if no data has been written yet.
return _bodyOriginalStream.FlushAsync(cancellationToken);
}
if (_compressionStream != null)
{
return _compressionStream.FlushAsync(cancellationToken);
}
return _bodyOriginalStream.FlushAsync(cancellationToken);
}
@ -129,7 +148,10 @@ namespace Microsoft.AspNetCore.ResponseCompression
public override void EndWrite(IAsyncResult asyncResult)
{
OnWrite();
if (!_compressionChecked)
{
throw new InvalidOperationException("BeginWrite was not called before EndWrite");
}
if (_compressionStream != null)
{
@ -175,5 +197,24 @@ namespace Microsoft.AspNetCore.ResponseCompression
return !_response.Headers.ContainsKey(HeaderNames.ContentRange) && // The response is not partial
_provider.ShouldCompressResponse(_response.HttpContext);
}
public void DisableRequestBuffering()
{
// Unrelated
_innerBufferFeature?.DisableRequestBuffering();
}
// For this to be effective it needs to be called before the first write.
public void DisableResponseBuffering()
{
if (!_compressionProvider.SupportsFlush)
{
// Don't compress, some of the providers don't implement Flush (e.g. .NET 4.5.1 GZip/Deflate stream)
// which would block real-time responses like SignalR.
_compressionChecked = true;
}
_innerBufferFeature?.DisableResponseBuffering();
}
}
}

Просмотреть файл

@ -11,16 +11,24 @@ namespace Microsoft.AspNetCore.ResponseCompression
/// </summary>
public class GzipCompressionProvider : ICompressionProvider
{
/// <summary>
/// Initialize a new <see cref="GzipCompressionProvider"/>.
/// </summary>
public GzipCompressionProvider()
{
}
/// <inheritdoc />
public string EncodingName => "gzip";
/// <inheritdoc />
public bool SupportsFlush
{
get
{
#if NET451
return false;
#elif NETSTANDARD1_3
return true;
#else
// Not implemented, compiler break
#endif
}
}
/// <summary>
/// What level of compression to use for the stream.
/// </summary>

Просмотреть файл

@ -15,6 +15,11 @@ namespace Microsoft.AspNetCore.ResponseCompression
/// </summary>
string EncodingName { get; }
/// <summary>
/// Indicates if the given provider supports Flush and FlushAsync. If not, compression may be disabled in some scenarios.
/// </summary>
bool SupportsFlush { get; }
/// <summary>
/// Create a new compression stream.
/// </summary>

Просмотреть файл

@ -4,6 +4,7 @@
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.Options;
namespace Microsoft.AspNetCore.ResponseCompression
@ -39,6 +40,7 @@ namespace Microsoft.AspNetCore.ResponseCompression
{
throw new ArgumentNullException(nameof(options));
}
_next = next;
_provider = provider;
_enableHttps = options.Value.EnableHttps;
@ -65,19 +67,23 @@ namespace Microsoft.AspNetCore.ResponseCompression
}
var bodyStream = context.Response.Body;
var originalBufferFeature = context.Features.Get<IHttpBufferingFeature>();
using (var bodyWrapperStream = new BodyWrapperStream(context.Response, bodyStream, _provider, compressionProvider))
var bodyWrapperStream = new BodyWrapperStream(context.Response, bodyStream, _provider, compressionProvider, originalBufferFeature);
context.Response.Body = bodyWrapperStream;
context.Features.Set<IHttpBufferingFeature>(bodyWrapperStream);
try
{
context.Response.Body = bodyWrapperStream;
try
{
await _next(context);
}
finally
{
context.Response.Body = bodyStream;
}
await _next(context);
// This is not disposed via a using statement because we don't want to flush the compression buffer for unhandled exceptions,
// that may cause secondary exceptions.
bodyWrapperStream.Dispose();
}
finally
{
context.Response.Body = bodyStream;
context.Features.Set(originalBufferFeature);
}
}
}

Просмотреть файл

@ -108,6 +108,7 @@ namespace Microsoft.AspNetCore.ResponseCompression
mimeType = mimeType.Trim();
}
// TODO PERF: StringSegments?
return _mimeTypes.Contains(mimeType);
}
}

Просмотреть файл

@ -4,10 +4,12 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.TestHost;
using Microsoft.Net.Http.Headers;
using Xunit;
@ -35,7 +37,7 @@ namespace Microsoft.AspNetCore.ResponseCompression.Tests
}
[Fact]
public async Task Request_AcceptGzipDeflate_ComrpessedGzip()
public async Task Request_AcceptGzipDeflate_CompressedGzip()
{
var response = await InvokeMiddleware(100, requestAcceptEncodings: new string[] { "gzip", "deflate" }, responseType: TextPlain);
@ -88,6 +90,7 @@ namespace Microsoft.AspNetCore.ResponseCompression.Tests
app.UseResponseCompression();
app.Run(context =>
{
context.Response.Headers[HeaderNames.ContentMD5] = "MD5";
context.Response.ContentType = contentType;
return context.Response.WriteAsync(new string('a', 100));
});
@ -101,7 +104,7 @@ namespace Microsoft.AspNetCore.ResponseCompression.Tests
var response = await client.SendAsync(request);
Assert.Equal(24, response.Content.ReadAsByteArrayAsync().Result.Length);
CheckResponseCompressed(response, expectedBodyLength: 24);
}
[Theory]
@ -119,6 +122,7 @@ namespace Microsoft.AspNetCore.ResponseCompression.Tests
app.UseResponseCompression();
app.Run(context =>
{
context.Response.Headers[HeaderNames.ContentMD5] = "MD5";
context.Response.ContentType = contentType;
return context.Response.WriteAsync(new string('a', 100));
});
@ -132,7 +136,43 @@ namespace Microsoft.AspNetCore.ResponseCompression.Tests
var response = await client.SendAsync(request);
Assert.Equal(100, response.Content.ReadAsByteArrayAsync().Result.Length);
CheckResponseNotCompressed(response, expectedBodyLength: 100);
}
[Theory]
[InlineData("")]
[InlineData("text/plain")]
[InlineData("text/PLAIN")]
[InlineData("text/plain; charset=ISO-8859-4")]
[InlineData("text/plain ; charset=ISO-8859-4")]
[InlineData("text/plain2")]
public async Task NoBody_NotCompressed(string contentType)
{
var builder = new WebHostBuilder()
.ConfigureServices(services =>
{
services.AddResponseCompression(TextPlain);
})
.Configure(app =>
{
app.UseResponseCompression();
app.Run(context =>
{
context.Response.Headers[HeaderNames.ContentMD5] = "MD5";
context.Response.ContentType = contentType;
return Task.FromResult(0);
});
});
var server = new TestServer(builder);
var client = server.CreateClient();
var request = new HttpRequestMessage(HttpMethod.Get, "");
request.Headers.AcceptEncoding.ParseAdd("gzip");
var response = await client.SendAsync(request);
CheckResponseNotCompressed(response, expectedBodyLength: 0);
}
[Fact]
@ -241,6 +281,304 @@ namespace Microsoft.AspNetCore.ResponseCompression.Tests
Assert.Equal(expectedLength, response.Content.ReadAsByteArrayAsync().Result.Length);
}
[Fact]
public async Task FlushHeaders_SendsHeaders_Compresses()
{
var responseReceived = new ManualResetEvent(false);
var builder = new WebHostBuilder()
.ConfigureServices(services =>
{
services.AddResponseCompression(TextPlain);
})
.Configure(app =>
{
app.UseResponseCompression();
app.Run(context =>
{
context.Response.Headers[HeaderNames.ContentMD5] = "MD5";
context.Response.ContentType = TextPlain;
context.Response.Body.Flush();
Assert.True(responseReceived.WaitOne(TimeSpan.FromSeconds(3)));
return context.Response.WriteAsync(new string('a', 100));
});
});
var server = new TestServer(builder);
var client = server.CreateClient();
var request = new HttpRequestMessage(HttpMethod.Get, "");
request.Headers.AcceptEncoding.ParseAdd("gzip");
var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
responseReceived.Set();
await response.Content.LoadIntoBufferAsync();
CheckResponseCompressed(response, expectedBodyLength: 24);
}
[Fact]
public async Task FlushAsyncHeaders_SendsHeaders_Compresses()
{
var responseReceived = new ManualResetEvent(false);
var builder = new WebHostBuilder()
.ConfigureServices(services =>
{
services.AddResponseCompression(TextPlain);
})
.Configure(app =>
{
app.UseResponseCompression();
app.Run(async context =>
{
context.Response.Headers[HeaderNames.ContentMD5] = "MD5";
context.Response.ContentType = TextPlain;
await context.Response.Body.FlushAsync();
Assert.True(responseReceived.WaitOne(TimeSpan.FromSeconds(3)));
await context.Response.WriteAsync(new string('a', 100));
});
});
var server = new TestServer(builder);
var client = server.CreateClient();
var request = new HttpRequestMessage(HttpMethod.Get, "");
request.Headers.AcceptEncoding.ParseAdd("gzip");
var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
responseReceived.Set();
await response.Content.LoadIntoBufferAsync();
CheckResponseCompressed(response, expectedBodyLength: 24);
}
[Fact]
public async Task FlushBody_CompressesAndFlushes()
{
var responseReceived = new ManualResetEvent(false);
var builder = new WebHostBuilder()
.ConfigureServices(services =>
{
services.AddResponseCompression(TextPlain);
})
.Configure(app =>
{
app.UseResponseCompression();
app.Run(context =>
{
context.Response.Headers[HeaderNames.ContentMD5] = "MD5";
context.Response.ContentType = TextPlain;
context.Response.Body.Write(new byte[10], 0, 10);
context.Response.Body.Flush();
Assert.True(responseReceived.WaitOne(TimeSpan.FromSeconds(3)));
context.Response.Body.Write(new byte[90], 0, 90);
return Task.FromResult(0);
});
});
var server = new TestServer(builder);
var client = server.CreateClient();
var request = new HttpRequestMessage(HttpMethod.Get, "");
request.Headers.AcceptEncoding.ParseAdd("gzip");
var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
IEnumerable<string> contentMD5 = null;
Assert.False(response.Headers.TryGetValues(HeaderNames.ContentMD5, out contentMD5));
Assert.Single(response.Content.Headers.ContentEncoding, "gzip");
var body = await response.Content.ReadAsStreamAsync();
var read = await body.ReadAsync(new byte[100], 0, 100);
Assert.True(read > 0);
responseReceived.Set();
read = await body.ReadAsync(new byte[100], 0, 100);
Assert.True(read > 0);
}
[Fact]
public async Task FlushAsyncBody_CompressesAndFlushes()
{
var responseReceived = new ManualResetEvent(false);
var builder = new WebHostBuilder()
.ConfigureServices(services =>
{
services.AddResponseCompression(TextPlain);
})
.Configure(app =>
{
app.UseResponseCompression();
app.Run(async context =>
{
context.Response.Headers[HeaderNames.ContentMD5] = "MD5";
context.Response.ContentType = TextPlain;
await context.Response.WriteAsync(new string('a', 10));
await context.Response.Body.FlushAsync();
Assert.True(responseReceived.WaitOne(TimeSpan.FromSeconds(3)));
await context.Response.WriteAsync(new string('a', 90));
});
});
var server = new TestServer(builder);
var client = server.CreateClient();
var request = new HttpRequestMessage(HttpMethod.Get, "");
request.Headers.AcceptEncoding.ParseAdd("gzip");
var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
IEnumerable<string> contentMD5 = null;
Assert.False(response.Headers.TryGetValues(HeaderNames.ContentMD5, out contentMD5));
Assert.Single(response.Content.Headers.ContentEncoding, "gzip");
var body = await response.Content.ReadAsStreamAsync();
var read = await body.ReadAsync(new byte[100], 0, 100);
Assert.True(read > 0);
responseReceived.Set();
read = await body.ReadAsync(new byte[100], 0, 100);
Assert.True(read > 0);
}
[Fact]
public async Task TrickleWriteAndFlush_FlushesEachWrite()
{
var responseReceived = new[]
{
new ManualResetEvent(false),
new ManualResetEvent(false),
new ManualResetEvent(false),
new ManualResetEvent(false),
new ManualResetEvent(false),
};
var builder = new WebHostBuilder()
.ConfigureServices(services =>
{
services.AddResponseCompression(TextPlain);
})
.Configure(app =>
{
app.UseResponseCompression();
app.Run(context =>
{
context.Response.Headers[HeaderNames.ContentMD5] = "MD5";
context.Response.ContentType = TextPlain;
context.Features.Get<IHttpBufferingFeature>()?.DisableResponseBuffering();
foreach (var signal in responseReceived)
{
context.Response.Body.Write(new byte[1], 0, 1);
context.Response.Body.Flush();
Assert.True(signal.WaitOne(TimeSpan.FromSeconds(3)));
}
return Task.FromResult(0);
});
});
var server = new TestServer(builder);
var client = server.CreateClient();
var request = new HttpRequestMessage(HttpMethod.Get, "");
request.Headers.AcceptEncoding.ParseAdd("gzip");
var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
#if NET451 // Flush not supported, compression disabled
Assert.NotNull(response.Headers.GetValues(HeaderNames.ContentMD5));
Assert.Empty(response.Content.Headers.ContentEncoding);
#elif NETCOREAPP1_0 // Flush supported, compression enabled
IEnumerable<string> contentMD5 = null;
Assert.False(response.Headers.TryGetValues(HeaderNames.ContentMD5, out contentMD5));
Assert.Single(response.Content.Headers.ContentEncoding, "gzip");
#else
Not implemented, compiler break
#endif
var body = await response.Content.ReadAsStreamAsync();
foreach (var signal in responseReceived)
{
var read = await body.ReadAsync(new byte[100], 0, 100);
Assert.True(read > 0);
signal.Set();
}
}
[Fact]
public async Task TrickleWriteAndFlushAsync_FlushesEachWrite()
{
var responseReceived = new[]
{
new ManualResetEvent(false),
new ManualResetEvent(false),
new ManualResetEvent(false),
new ManualResetEvent(false),
new ManualResetEvent(false),
};
var builder = new WebHostBuilder()
.ConfigureServices(services =>
{
services.AddResponseCompression(TextPlain);
})
.Configure(app =>
{
app.UseResponseCompression();
app.Run(async context =>
{
context.Response.Headers[HeaderNames.ContentMD5] = "MD5";
context.Response.ContentType = TextPlain;
context.Features.Get<IHttpBufferingFeature>()?.DisableResponseBuffering();
foreach (var signal in responseReceived)
{
await context.Response.WriteAsync("a");
await context.Response.Body.FlushAsync();
Assert.True(signal.WaitOne(TimeSpan.FromSeconds(3)));
}
});
});
var server = new TestServer(builder);
var client = server.CreateClient();
var request = new HttpRequestMessage(HttpMethod.Get, "");
request.Headers.AcceptEncoding.ParseAdd("gzip");
var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
#if NET451 // Flush not supported, compression disabled
Assert.NotNull(response.Headers.GetValues(HeaderNames.ContentMD5));
Assert.Empty(response.Content.Headers.ContentEncoding);
#elif NETCOREAPP1_0 // Flush supported, compression enabled
IEnumerable<string> contentMD5 = null;
Assert.False(response.Headers.TryGetValues(HeaderNames.ContentMD5, out contentMD5));
Assert.Single(response.Content.Headers.ContentEncoding, "gzip");
#else
Not implemented, compiler break
#endif
var body = await response.Content.ReadAsStreamAsync();
foreach (var signal in responseReceived)
{
var read = await body.ReadAsync(new byte[100], 0, 100);
Assert.True(read > 0);
signal.Set();
}
}
private Task<HttpResponseMessage> InvokeMiddleware(int uncompressedBodyLength, string[] requestAcceptEncodings, string responseType, Action<HttpResponse> addResponseAction = null)
{
var builder = new WebHostBuilder()