зеркало из https://github.com/dotnet/aspnetcore.git
Redis distributed/output cache: include scenario in connection metadata (#49291)
* - advertise scenario in the "CLIENT SETINFO" metadata - rev se.redis library version (this API is new) * Centralise exception handling of output cache read/write (much more important now that we have out-of-process implementations) * include lib hint in test config * rerun baseline generator * use RedisChannel.Literal on pub/sub; for pub, this makes no difference; for sub, this means "single channel, not a glob pattern" * supply logger param in tests * use [LoggerMessage] consistently * include SignalR in the redis client library name * don't use nameof for LoggerExtensions
This commit is contained in:
Родитель
b4d0e85805
Коммит
792f1f2185
|
@ -315,7 +315,7 @@
|
|||
<SeleniumWebDriverVersion>4.10.0</SeleniumWebDriverVersion>
|
||||
<SerilogExtensionsLoggingVersion>1.4.0</SerilogExtensionsLoggingVersion>
|
||||
<SerilogSinksFileVersion>4.0.0</SerilogSinksFileVersion>
|
||||
<StackExchangeRedisVersion>2.6.90</StackExchangeRedisVersion>
|
||||
<StackExchangeRedisVersion>2.6.122</StackExchangeRedisVersion>
|
||||
<SystemReactiveLinqVersion>5.0.0</SystemReactiveLinqVersion>
|
||||
<SwashbuckleAspNetCoreVersion>6.4.0</SwashbuckleAspNetCoreVersion>
|
||||
<XunitAbstractionsVersion>2.0.3</XunitAbstractionsVersion>
|
||||
|
|
|
@ -296,6 +296,7 @@ public partial class RedisCache : IDistributedCache, IDisposable
|
|||
}
|
||||
return ConnectSlowAsync(token);
|
||||
}
|
||||
|
||||
private async ValueTask<IDatabase> ConnectSlowAsync(CancellationToken token)
|
||||
{
|
||||
await _connectionLock.WaitAsync(token).ConfigureAwait(false);
|
||||
|
@ -307,14 +308,7 @@ public partial class RedisCache : IDistributedCache, IDisposable
|
|||
IConnectionMultiplexer connection;
|
||||
if (_options.ConnectionMultiplexerFactory is null)
|
||||
{
|
||||
if (_options.ConfigurationOptions is not null)
|
||||
{
|
||||
connection = await ConnectionMultiplexer.ConnectAsync(_options.ConfigurationOptions).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
connection = await ConnectionMultiplexer.ConnectAsync(_options.Configuration!).ConfigureAwait(false);
|
||||
}
|
||||
connection = await ConnectionMultiplexer.ConnectAsync(_options.GetConfiguredOptions("asp.net DC")).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -5,6 +5,7 @@ using System;
|
|||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Options;
|
||||
using StackExchange.Redis;
|
||||
using StackExchange.Redis.Configuration;
|
||||
using StackExchange.Redis.Profiling;
|
||||
|
||||
namespace Microsoft.Extensions.Caching.StackExchangeRedis;
|
||||
|
@ -57,4 +58,19 @@ public class RedisCacheOptions : IOptions<RedisCacheOptions>
|
|||
}
|
||||
set => _useForceReconnect = value;
|
||||
}
|
||||
|
||||
internal ConfigurationOptions GetConfiguredOptions(string libSuffix)
|
||||
{
|
||||
var options = ConfigurationOptions?.Clone() ?? ConfigurationOptions.Parse(Configuration!);
|
||||
|
||||
// we don't want an initially unavailable server to prevent DI creating the service itself
|
||||
options.AbortOnConnectFail = false;
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(libSuffix))
|
||||
{
|
||||
var provider = DefaultOptionsProvider.GetProvider(options.EndPoints);
|
||||
options.LibraryName = $"{provider.LibraryName} {libSuffix}";
|
||||
}
|
||||
return options;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -334,14 +334,7 @@ internal class RedisOutputCacheStore : IOutputCacheStore, IOutputCacheBufferStor
|
|||
IConnectionMultiplexer connection;
|
||||
if (_options.ConnectionMultiplexerFactory is null)
|
||||
{
|
||||
if (_options.ConfigurationOptions is not null)
|
||||
{
|
||||
connection = await ConnectionMultiplexer.ConnectAsync(_options.ConfigurationOptions).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
connection = await ConnectionMultiplexer.ConnectAsync(_options.Configuration!).ConfigureAwait(false);
|
||||
}
|
||||
connection = await ConnectionMultiplexer.ConnectAsync(_options.GetConfiguredOptions("asp.net OC")).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -14,7 +14,11 @@ public class RedisConnectionFixture : IDisposable
|
|||
private readonly ConnectionMultiplexer _muxer;
|
||||
public RedisConnectionFixture()
|
||||
{
|
||||
_muxer = ConnectionMultiplexer.Connect("127.0.0.1:6379");
|
||||
var options = new RedisCacheOptions
|
||||
{
|
||||
Configuration = "127.0.0.1:6379", // TODO: CI test config here
|
||||
}.GetConfiguredOptions("CI test");
|
||||
_muxer = ConnectionMultiplexer.Connect(options);
|
||||
}
|
||||
|
||||
public IDatabase Database => _muxer.GetDatabase();
|
||||
|
|
|
@ -49,4 +49,10 @@ internal static partial class LoggerExtensions
|
|||
EventName = "ExpirationExpiresExceeded")]
|
||||
internal static partial void ExpirationExpiresExceeded(this ILogger logger, DateTimeOffset responseTime);
|
||||
|
||||
[LoggerMessage(12, LogLevel.Error, "Unable to query output cache.", EventName = "UnableToQueryOutputCache")]
|
||||
internal static partial void UnableToQueryOutputCache(this ILogger logger, Exception exception);
|
||||
|
||||
[LoggerMessage(13, LogLevel.Error, "Unable to write to output-cache.", EventName = "UnableToWriteToOutputCache")]
|
||||
internal static partial void UnableToWriteToOutputCache(this ILogger logger, Exception exception);
|
||||
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ using System.Buffers;
|
|||
using System.Linq;
|
||||
using System.Text;
|
||||
using Microsoft.AspNetCore.OutputCaching.Serialization;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Microsoft.AspNetCore.OutputCaching;
|
||||
/// <summary>
|
||||
|
@ -52,7 +53,7 @@ internal static class OutputCacheEntryFormatter
|
|||
return outputCacheEntry;
|
||||
}
|
||||
|
||||
public static async ValueTask StoreAsync(string key, OutputCacheEntry value, TimeSpan duration, IOutputCacheStore store, CancellationToken cancellationToken)
|
||||
public static async ValueTask StoreAsync(string key, OutputCacheEntry value, TimeSpan duration, IOutputCacheStore store, ILogger logger, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(value);
|
||||
ArgumentNullException.ThrowIfNull(value.Body);
|
||||
|
@ -85,14 +86,25 @@ internal static class OutputCacheEntryFormatter
|
|||
}
|
||||
|
||||
var payload = new ReadOnlySequence<byte>(segment.Array!, segment.Offset, segment.Count);
|
||||
if (store is IOutputCacheBufferStore bufferStore)
|
||||
try
|
||||
{
|
||||
await bufferStore.SetAsync(key, payload, value.Tags, duration, cancellationToken);
|
||||
if (store is IOutputCacheBufferStore bufferStore)
|
||||
{
|
||||
await bufferStore.SetAsync(key, payload, value.Tags, duration, cancellationToken);
|
||||
}
|
||||
else
|
||||
{
|
||||
// legacy API/in-proc: create an isolated right-sized byte[] for the payload
|
||||
await store.SetAsync(key, payload.ToArray(), value.Tags, duration, cancellationToken);
|
||||
}
|
||||
}
|
||||
else
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// legacy API/in-proc: create an isolated right-sized byte[] for the payload
|
||||
await store.SetAsync(key, payload.ToArray(), value.Tags, duration, cancellationToken);
|
||||
// don't report as failure
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.UnableToWriteToOutputCache(ex);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -336,10 +336,23 @@ internal sealed class OutputCacheMiddleware
|
|||
// Locking cache lookups by default
|
||||
// TODO: should it be part of the cache implementations or can we assume all caches would benefit from it?
|
||||
// It makes sense for caches that use IO (disk, network) or need to deserialize the state but could also be a global option
|
||||
OutputCacheEntry? cacheEntry;
|
||||
try
|
||||
{
|
||||
cacheEntry = await _outputCacheEntryDispatcher.ScheduleAsync(cacheContext.CacheKey, (Store: _store, CacheContext: cacheContext), static async (key, state) => await OutputCacheEntryFormatter.GetAsync(key, state.Store, state.CacheContext.HttpContext.RequestAborted));
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// don't report as failure
|
||||
cacheEntry = null;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.UnableToQueryOutputCache(ex);
|
||||
cacheEntry = null;
|
||||
}
|
||||
|
||||
var cacheEntry = await _outputCacheEntryDispatcher.ScheduleAsync(cacheContext.CacheKey, (Store: _store, CacheContext: cacheContext), static async (key, state) => await OutputCacheEntryFormatter.GetAsync(key, state.Store, state.CacheContext.HttpContext.RequestAborted));
|
||||
|
||||
if (await TryServeCachedResponseAsync(cacheContext, cacheEntry, policies))
|
||||
if (cacheEntry is not null && await TryServeCachedResponseAsync(cacheContext, cacheEntry, policies))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
@ -440,7 +453,7 @@ internal sealed class OutputCacheMiddleware
|
|||
else
|
||||
{
|
||||
_logger.ResponseCached();
|
||||
await OutputCacheEntryFormatter.StoreAsync(context.CacheKey, context.CachedResponse, context.CachedResponseValidFor, _store, context.HttpContext.RequestAborted);
|
||||
await OutputCacheEntryFormatter.StoreAsync(context.CacheKey, context.CachedResponse, context.CachedResponseValidFor, _store, _logger, context.HttpContext.RequestAborted);
|
||||
}
|
||||
}
|
||||
else
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
// The .NET Foundation licenses this file to you under the MIT license.
|
||||
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.Extensions.Primitives;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Net.Http.Headers;
|
||||
|
||||
namespace Microsoft.AspNetCore.OutputCaching.Tests;
|
||||
|
@ -23,7 +23,7 @@ public class OutputCacheEntryFormatterTests
|
|||
Tags = Array.Empty<string>()
|
||||
};
|
||||
|
||||
await OutputCacheEntryFormatter.StoreAsync(key, entry, TimeSpan.Zero, store, default);
|
||||
await OutputCacheEntryFormatter.StoreAsync(key, entry, TimeSpan.Zero, store, NullLogger.Instance, default);
|
||||
|
||||
var result = await OutputCacheEntryFormatter.GetAsync(key, store, default);
|
||||
|
||||
|
@ -47,7 +47,7 @@ public class OutputCacheEntryFormatterTests
|
|||
Tags = new[] { "tag", "タグ" }
|
||||
};
|
||||
|
||||
await OutputCacheEntryFormatter.StoreAsync(key, entry, TimeSpan.Zero, store, default);
|
||||
await OutputCacheEntryFormatter.StoreAsync(key, entry, TimeSpan.Zero, store, NullLogger.Instance, default);
|
||||
|
||||
var result = await OutputCacheEntryFormatter.GetAsync(key, store, default);
|
||||
|
||||
|
@ -66,7 +66,7 @@ public class OutputCacheEntryFormatterTests
|
|||
Tags = new[] { null, null, "", "tag" }
|
||||
};
|
||||
|
||||
await OutputCacheEntryFormatter.StoreAsync(key, entry, TimeSpan.Zero, store, default);
|
||||
await OutputCacheEntryFormatter.StoreAsync(key, entry, TimeSpan.Zero, store, NullLogger.Instance, default);
|
||||
|
||||
var result = await OutputCacheEntryFormatter.GetAsync(key, store, default);
|
||||
|
||||
|
@ -89,7 +89,7 @@ public class OutputCacheEntryFormatterTests
|
|||
Tags = Array.Empty<string>()
|
||||
};
|
||||
|
||||
await OutputCacheEntryFormatter.StoreAsync(key, entry, TimeSpan.Zero, store, default);
|
||||
await OutputCacheEntryFormatter.StoreAsync(key, entry, TimeSpan.Zero, store, NullLogger.Instance, default);
|
||||
|
||||
var result = await OutputCacheEntryFormatter.GetAsync(key, store, default);
|
||||
|
||||
|
|
|
@ -1,14 +1,13 @@
|
|||
// Licensed to the .NET Foundation under one or more agreements.
|
||||
// The .NET Foundation licenses this file to you under the MIT license.
|
||||
|
||||
using System.Globalization;
|
||||
using System.Text;
|
||||
using System.Text.Unicode;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.Http.Features;
|
||||
using Microsoft.AspNetCore.OutputCaching.Memory;
|
||||
using Microsoft.AspNetCore.Testing;
|
||||
using Microsoft.Extensions.Caching.Memory;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Logging.Testing;
|
||||
using Microsoft.Extensions.Primitives;
|
||||
using Microsoft.Net.Http.Headers;
|
||||
|
@ -71,6 +70,7 @@ public class OutputCacheMiddlewareTests
|
|||
},
|
||||
TimeSpan.Zero,
|
||||
cache,
|
||||
NullLogger.Instance,
|
||||
default);
|
||||
|
||||
Assert.True(await middleware.TryServeFromCacheAsync(context, policies));
|
||||
|
@ -102,6 +102,7 @@ public class OutputCacheMiddlewareTests
|
|||
},
|
||||
TimeSpan.Zero,
|
||||
cache,
|
||||
NullLogger.Instance,
|
||||
default);
|
||||
|
||||
Assert.True(await middleware.TryServeFromCacheAsync(context, policies));
|
||||
|
@ -130,6 +131,7 @@ public class OutputCacheMiddlewareTests
|
|||
},
|
||||
TimeSpan.Zero,
|
||||
cache,
|
||||
NullLogger.Instance,
|
||||
default);
|
||||
|
||||
Assert.True(await middleware.TryServeFromCacheAsync(context, policies));
|
||||
|
|
|
@ -123,7 +123,7 @@ public class RedisHubLifetimeManager<THub> : HubLifetimeManager<THub>, IDisposab
|
|||
var tasks = new List<Task>();
|
||||
|
||||
RedisLog.Unsubscribe(_logger, connectionChannel);
|
||||
tasks.Add(_bus.UnsubscribeAsync(connectionChannel));
|
||||
tasks.Add(_bus.UnsubscribeAsync(RedisChannel.Literal(connectionChannel)));
|
||||
|
||||
var feature = connection.Features.GetRequiredFeature<IRedisFeature>();
|
||||
var groupNames = feature.Groups;
|
||||
|
@ -295,7 +295,7 @@ public class RedisHubLifetimeManager<THub> : HubLifetimeManager<THub>, IDisposab
|
|||
{
|
||||
await EnsureRedisServerConnection();
|
||||
RedisLog.PublishToChannel(_logger, channel);
|
||||
return await _bus!.PublishAsync(channel, payload);
|
||||
return await _bus!.PublishAsync(RedisChannel.Literal(channel), payload);
|
||||
}
|
||||
|
||||
private Task AddGroupAsyncCore(HubConnectionContext connection, string groupName)
|
||||
|
@ -328,7 +328,7 @@ public class RedisHubLifetimeManager<THub> : HubLifetimeManager<THub>, IDisposab
|
|||
{
|
||||
var lifetimeManager = (RedisHubLifetimeManager<THub>)state;
|
||||
RedisLog.Unsubscribe(lifetimeManager._logger, channelName);
|
||||
return lifetimeManager._bus!.UnsubscribeAsync(channelName);
|
||||
return lifetimeManager._bus!.UnsubscribeAsync(RedisChannel.Literal(channelName));
|
||||
});
|
||||
|
||||
var feature = connection.Features.GetRequiredFeature<IRedisFeature>();
|
||||
|
@ -361,7 +361,7 @@ public class RedisHubLifetimeManager<THub> : HubLifetimeManager<THub>, IDisposab
|
|||
{
|
||||
var lifetimeManager = (RedisHubLifetimeManager<THub>)state;
|
||||
RedisLog.Unsubscribe(lifetimeManager._logger, channelName);
|
||||
return lifetimeManager._bus!.UnsubscribeAsync(channelName);
|
||||
return lifetimeManager._bus!.UnsubscribeAsync(RedisChannel.Literal(channelName));
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -448,7 +448,7 @@ public class RedisHubLifetimeManager<THub> : HubLifetimeManager<THub>, IDisposab
|
|||
private async Task SubscribeToAll()
|
||||
{
|
||||
RedisLog.Subscribing(_logger, _channels.All);
|
||||
var channel = await _bus!.SubscribeAsync(_channels.All);
|
||||
var channel = await _bus!.SubscribeAsync(RedisChannel.Literal(_channels.All));
|
||||
channel.OnMessage(async channelMessage =>
|
||||
{
|
||||
try
|
||||
|
@ -478,7 +478,7 @@ public class RedisHubLifetimeManager<THub> : HubLifetimeManager<THub>, IDisposab
|
|||
|
||||
private async Task SubscribeToGroupManagementChannel()
|
||||
{
|
||||
var channel = await _bus!.SubscribeAsync(_channels.GroupManagement);
|
||||
var channel = await _bus!.SubscribeAsync(RedisChannel.Literal(_channels.GroupManagement));
|
||||
channel.OnMessage(async channelMessage =>
|
||||
{
|
||||
try
|
||||
|
@ -515,7 +515,7 @@ public class RedisHubLifetimeManager<THub> : HubLifetimeManager<THub>, IDisposab
|
|||
private async Task SubscribeToAckChannel()
|
||||
{
|
||||
// Create server specific channel in order to send an ack to a single server
|
||||
var channel = await _bus!.SubscribeAsync(_channels.Ack(_serverName));
|
||||
var channel = await _bus!.SubscribeAsync(RedisChannel.Literal(_channels.Ack(_serverName)));
|
||||
channel.OnMessage(channelMessage =>
|
||||
{
|
||||
var ackId = RedisProtocol.ReadAck(channelMessage.Message);
|
||||
|
@ -529,7 +529,7 @@ public class RedisHubLifetimeManager<THub> : HubLifetimeManager<THub>, IDisposab
|
|||
var connectionChannel = _channels.Connection(connection.ConnectionId);
|
||||
|
||||
RedisLog.Subscribing(_logger, connectionChannel);
|
||||
var channel = await _bus!.SubscribeAsync(connectionChannel);
|
||||
var channel = await _bus!.SubscribeAsync(RedisChannel.Literal(connectionChannel));
|
||||
channel.OnMessage(channelMessage =>
|
||||
{
|
||||
var invocation = RedisProtocol.ReadInvocation(channelMessage.Message);
|
||||
|
@ -586,7 +586,7 @@ public class RedisHubLifetimeManager<THub> : HubLifetimeManager<THub>, IDisposab
|
|||
return _users.AddSubscriptionAsync(userChannel, connection, async (channelName, subscriptions) =>
|
||||
{
|
||||
RedisLog.Subscribing(_logger, channelName);
|
||||
var channel = await _bus!.SubscribeAsync(channelName);
|
||||
var channel = await _bus!.SubscribeAsync(RedisChannel.Literal(channelName));
|
||||
channel.OnMessage(async channelMessage =>
|
||||
{
|
||||
try
|
||||
|
@ -612,7 +612,7 @@ public class RedisHubLifetimeManager<THub> : HubLifetimeManager<THub>, IDisposab
|
|||
private async Task SubscribeToGroupAsync(string groupChannel, HubConnectionStore groupConnections)
|
||||
{
|
||||
RedisLog.Subscribing(_logger, groupChannel);
|
||||
var channel = await _bus!.SubscribeAsync(groupChannel);
|
||||
var channel = await _bus!.SubscribeAsync(RedisChannel.Literal(groupChannel));
|
||||
channel.OnMessage(async (channelMessage) =>
|
||||
{
|
||||
try
|
||||
|
@ -641,7 +641,7 @@ public class RedisHubLifetimeManager<THub> : HubLifetimeManager<THub>, IDisposab
|
|||
|
||||
private async Task SubscribeToReturnResultsAsync()
|
||||
{
|
||||
var channel = await _bus!.SubscribeAsync(_channels.ReturnResults);
|
||||
var channel = await _bus!.SubscribeAsync(RedisChannel.Literal(_channels.ReturnResults));
|
||||
channel.OnMessage((channelMessage) =>
|
||||
{
|
||||
var completion = RedisProtocol.ReadCompletion(channelMessage.Message);
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
using System.Net;
|
||||
using StackExchange.Redis;
|
||||
using StackExchange.Redis.Configuration;
|
||||
|
||||
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis;
|
||||
|
||||
|
@ -38,6 +39,10 @@ public class RedisOptions
|
|||
Configuration.SetDefaultPorts();
|
||||
}
|
||||
|
||||
// suffix SignalR onto the declared library name
|
||||
var provider = DefaultOptionsProvider.GetProvider(Configuration.EndPoints);
|
||||
Configuration.LibraryName = $"{provider.LibraryName} SignalR";
|
||||
|
||||
return await ConnectionMultiplexer.ConnectAsync(Configuration, log);
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче