Support multiple configuration sources (#1534)
This commit is contained in:
Родитель
e1af15fca0
Коммит
5cb206c51e
|
@ -34,6 +34,22 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
|
|||
}
|
||||
```
|
||||
|
||||
## Multiple Configuration Sources
|
||||
As of 1.1, YARP supports loading the proxy configuration from multiple sources. LoadFromConfig may be called multiple times referencing different IConfiguration sections or may be combine with a different config source like InMemory. Routes can reference clusters from other sources. Note merging partial config from different sources for a given route or cluster is not supported.
|
||||
|
||||
```
|
||||
services.AddReverseProxy()
|
||||
.LoadFromConfig(Configuration.GetSection("ReverseProxy1"))
|
||||
.LoadFromConfig(Configuration.GetSection("ReverseProxy2"));
|
||||
```
|
||||
or
|
||||
```
|
||||
|
||||
services.AddReverseProxy()
|
||||
.LoadFromMemory(routes, clusters)
|
||||
.LoadFromConfig(Configuration.GetSection("ReverseProxy"));
|
||||
```
|
||||
|
||||
## Configuration contract
|
||||
File-based configuration is dynamically mapped to the types in [Yarp.ReverseProxy.Configuration](xref:Yarp.ReverseProxy.Configuration) namespace by an [IProxyConfigProvider](xref:Yarp.ReverseProxy.Configuration.IProxyConfigProvider) implementation converts at application start and each time the configuration changes.
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ The proxy will validate the given configuration and if it's invalid, an exceptio
|
|||
The configuration objects and collections supplied to the proxy should be read-only and not modified once they have been handed to the proxy via `GetConfig()`.
|
||||
|
||||
### Reload
|
||||
If the `IChangeToken` supports `ActiveChangeCallbacks`, once the proxy has processed the initial set of configuration it will register a callback with this token. Note the proxy does not support polling for changes.
|
||||
If the `IChangeToken` supports `ActiveChangeCallbacks`, once the proxy has processed the initial set of configuration it will register a callback with this token. If the provider does not support callbacks then `HasChanged` will be polled every 5 minutes.
|
||||
|
||||
When the provider wants to provide new configuration to the proxy it should:
|
||||
- load that configuration in the background.
|
||||
|
@ -53,6 +53,23 @@ There are important differences when reloading configuration vs the first config
|
|||
|
||||
Once the new configuration has been validated and applied, the proxy will register a callback with the new `IChangeToken`. Note if there are multiple reloads signaled in close succession, the proxy may skip some and load the next available configuration as soon as it's ready. Each `IProxyConfig` contains the full configuration state so nothing will be lost.
|
||||
|
||||
## Multiple Configuration Sources
|
||||
As of 1.1, YARP supports loading the proxy configuration from multiple sources. Multiple `IProxyConfigProvider`'s can be registered as singleton services and all will be resolved and combine. The sources may be the same or different types such as IConfiguration or InMemory. Routes can reference clusters from other sources. Note merging partial config from different sources for a given route or cluster is not supported.
|
||||
|
||||
```
|
||||
services.AddReverseProxy()
|
||||
.LoadFromConfig(Configuration.GetSection("ReverseProxy1"))
|
||||
.LoadFromConfig(Configuration.GetSection("ReverseProxy2"));
|
||||
```
|
||||
or
|
||||
```
|
||||
|
||||
services.AddReverseProxy()
|
||||
.LoadFromMemory(routes, clusters)
|
||||
.LoadFromConfig(Configuration.GetSection("ReverseProxy"));
|
||||
```
|
||||
|
||||
|
||||
## Example
|
||||
The following is an example `IProxyConfigProvider` that has routes and clusters manually loaded into it.
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ using Microsoft.Extensions.Primitives;
|
|||
namespace Yarp.ReverseProxy.Configuration;
|
||||
|
||||
/// <summary>
|
||||
/// Represents a snapshot of proxy configuration data.
|
||||
/// Represents a snapshot of proxy configuration data. These properties may be accessed multiple times and should not be modified.
|
||||
/// </summary>
|
||||
public interface IProxyConfig
|
||||
{
|
||||
|
|
|
@ -34,9 +34,10 @@ internal sealed class ProxyConfigManager : EndpointDataSource, IDisposable
|
|||
{
|
||||
private static readonly IReadOnlyDictionary<string, ClusterConfig> _emptyClusterDictionary = new ReadOnlyDictionary<string, ClusterConfig>(new Dictionary<string, ClusterConfig>());
|
||||
|
||||
private readonly object _syncRoot = new object();
|
||||
private readonly object _syncRoot = new();
|
||||
private readonly ILogger<ProxyConfigManager> _logger;
|
||||
private readonly IProxyConfigProvider _provider;
|
||||
private readonly IProxyConfigProvider[] _providers;
|
||||
private readonly ConfigState[] _configs;
|
||||
private readonly IClusterChangeListener[] _clusterChangeListeners;
|
||||
private readonly ConcurrentDictionary<string, ClusterState> _clusters = new(StringComparer.OrdinalIgnoreCase);
|
||||
private readonly ConcurrentDictionary<string, RouteState> _routes = new(StringComparer.OrdinalIgnoreCase);
|
||||
|
@ -48,15 +49,16 @@ internal sealed class ProxyConfigManager : EndpointDataSource, IDisposable
|
|||
private readonly List<Action<EndpointBuilder>> _conventions;
|
||||
private readonly IActiveHealthCheckMonitor _activeHealthCheckMonitor;
|
||||
private readonly IClusterDestinationsUpdater _clusterDestinationsUpdater;
|
||||
private IDisposable? _changeSubscription;
|
||||
|
||||
private List<Endpoint>? _endpoints;
|
||||
private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
|
||||
private IChangeToken _changeToken;
|
||||
private CancellationTokenSource _endpointsChangeSource = new();
|
||||
private IChangeToken _endpointsChangeToken;
|
||||
|
||||
private CancellationTokenSource _configChangeSource = new();
|
||||
|
||||
public ProxyConfigManager(
|
||||
ILogger<ProxyConfigManager> logger,
|
||||
IProxyConfigProvider provider,
|
||||
IEnumerable<IProxyConfigProvider> providers,
|
||||
IEnumerable<IClusterChangeListener> clusterChangeListeners,
|
||||
IEnumerable<IProxyConfigFilter> filters,
|
||||
IConfigValidator configValidator,
|
||||
|
@ -67,7 +69,7 @@ internal sealed class ProxyConfigManager : EndpointDataSource, IDisposable
|
|||
IClusterDestinationsUpdater clusterDestinationsUpdater)
|
||||
{
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_provider = provider ?? throw new ArgumentNullException(nameof(provider));
|
||||
_providers = providers?.ToArray() ?? throw new ArgumentNullException(nameof(providers));
|
||||
_clusterChangeListeners = (clusterChangeListeners as IClusterChangeListener[])
|
||||
?? clusterChangeListeners?.ToArray() ?? throw new ArgumentNullException(nameof(clusterChangeListeners));
|
||||
_filters = (filters as IProxyConfigFilter[]) ?? filters?.ToArray() ?? throw new ArgumentNullException(nameof(filters));
|
||||
|
@ -78,10 +80,17 @@ internal sealed class ProxyConfigManager : EndpointDataSource, IDisposable
|
|||
_activeHealthCheckMonitor = activeHealthCheckMonitor ?? throw new ArgumentNullException(nameof(activeHealthCheckMonitor));
|
||||
_clusterDestinationsUpdater = clusterDestinationsUpdater ?? throw new ArgumentNullException(nameof(clusterDestinationsUpdater));
|
||||
|
||||
if (_providers.Length == 0)
|
||||
{
|
||||
throw new ArgumentException($"At least one {nameof(IProxyConfigProvider)} is required.", nameof(providers));
|
||||
}
|
||||
|
||||
_configs = new ConfigState[_providers.Length];
|
||||
|
||||
_conventions = new List<Action<EndpointBuilder>>();
|
||||
DefaultBuilder = new ReverseProxyConventionBuilder(_conventions);
|
||||
|
||||
_changeToken = new CancellationChangeToken(_cancellationTokenSource.Token);
|
||||
_endpointsChangeToken = new CancellationChangeToken(_endpointsChangeSource.Token);
|
||||
}
|
||||
|
||||
public ReverseProxyConventionBuilder DefaultBuilder { get; }
|
||||
|
@ -130,24 +139,30 @@ internal sealed class ProxyConfigManager : EndpointDataSource, IDisposable
|
|||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public override IChangeToken GetChangeToken() => Volatile.Read(ref _changeToken);
|
||||
public override IChangeToken GetChangeToken() => Volatile.Read(ref _endpointsChangeToken);
|
||||
|
||||
// IProxyConfigManager
|
||||
|
||||
/// <inheritdoc/>
|
||||
public async Task<EndpointDataSource> InitialLoadAsync()
|
||||
internal async Task<EndpointDataSource> InitialLoadAsync()
|
||||
{
|
||||
// Trigger the first load immediately and throw if it fails.
|
||||
// We intend this to crash the app so we don't try listening for further changes.
|
||||
try
|
||||
{
|
||||
var config = _provider.GetConfig();
|
||||
await ApplyConfigAsync(config);
|
||||
var routes = new List<RouteConfig>();
|
||||
var clusters = new List<ClusterConfig>();
|
||||
|
||||
if (config.ChangeToken.ActiveChangeCallbacks)
|
||||
for (var i = 0; i < _providers.Length; i++)
|
||||
{
|
||||
_changeSubscription = config.ChangeToken.RegisterChangeCallback(ReloadConfig, this);
|
||||
var provider = _providers[i];
|
||||
var config = provider.GetConfig();
|
||||
ValidateConfigProperties(config);
|
||||
_configs[i] = new ConfigState(provider, config);
|
||||
routes.AddRange(config.Routes ?? Array.Empty<RouteConfig>());
|
||||
clusters.AddRange(config.Clusters ?? Array.Empty<ClusterConfig>());
|
||||
}
|
||||
|
||||
await ApplyConfigAsync(routes, clusters);
|
||||
|
||||
ListenForConfigChanges();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
@ -160,57 +175,133 @@ internal sealed class ProxyConfigManager : EndpointDataSource, IDisposable
|
|||
return this;
|
||||
}
|
||||
|
||||
private static void ReloadConfig(object state)
|
||||
{
|
||||
var manager = (ProxyConfigManager)state;
|
||||
_ = manager.ReloadConfigAsync();
|
||||
}
|
||||
|
||||
private async Task ReloadConfigAsync()
|
||||
{
|
||||
_changeSubscription?.Dispose();
|
||||
_configChangeSource.Dispose();
|
||||
|
||||
IProxyConfig newConfig;
|
||||
try
|
||||
var sourcesChanged = false;
|
||||
var routes = new List<RouteConfig>();
|
||||
var clusters = new List<ClusterConfig>();
|
||||
foreach (var instance in _configs)
|
||||
{
|
||||
newConfig = _provider.GetConfig();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.ErrorReloadingConfig(_logger, ex);
|
||||
// If we can't load the config then we can't listen for changes anymore.
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var hasChanged = await ApplyConfigAsync(newConfig);
|
||||
lock (_syncRoot)
|
||||
try
|
||||
{
|
||||
// Skip if changes are signaled before the endpoints are initialized for the first time.
|
||||
// The endpoint conventions might not be ready yet.
|
||||
if (hasChanged && _endpoints != null)
|
||||
if (instance.LatestConfig.ChangeToken.HasChanged)
|
||||
{
|
||||
CreateEndpoints();
|
||||
var config = instance.Provider.GetConfig();
|
||||
ValidateConfigProperties(config);
|
||||
instance.LatestConfig = config;
|
||||
instance.LoadFailed = false;
|
||||
sourcesChanged = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.ErrorApplyingConfig(_logger, ex);
|
||||
catch (Exception ex)
|
||||
{
|
||||
instance.LoadFailed = true;
|
||||
Log.ErrorReloadingConfig(_logger, ex);
|
||||
}
|
||||
|
||||
// If we didn't/couldn't get a new config then re-use the last one.
|
||||
routes.AddRange(instance.LatestConfig.Routes ?? Array.Empty<RouteConfig>());
|
||||
clusters.AddRange(instance.LatestConfig.Clusters ?? Array.Empty<ClusterConfig>());
|
||||
}
|
||||
|
||||
if (newConfig.ChangeToken.ActiveChangeCallbacks)
|
||||
// Only reload if at least one provider changed.
|
||||
if (sourcesChanged)
|
||||
{
|
||||
_changeSubscription = newConfig.ChangeToken.RegisterChangeCallback(ReloadConfig, this);
|
||||
try
|
||||
{
|
||||
var hasChanged = await ApplyConfigAsync(routes, clusters);
|
||||
lock (_syncRoot)
|
||||
{
|
||||
// Skip if changes are signaled before the endpoints are initialized for the first time.
|
||||
// The endpoint conventions might not be ready yet.
|
||||
if (hasChanged && _endpoints != null)
|
||||
{
|
||||
CreateEndpoints();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.ErrorApplyingConfig(_logger, ex);
|
||||
}
|
||||
}
|
||||
|
||||
ListenForConfigChanges();
|
||||
}
|
||||
|
||||
private static void ValidateConfigProperties(IProxyConfig config)
|
||||
{
|
||||
if (config == null)
|
||||
{
|
||||
throw new InvalidOperationException($"{nameof(IProxyConfigProvider.GetConfig)} returned a null value.");
|
||||
}
|
||||
if (config.ChangeToken == null)
|
||||
{
|
||||
throw new InvalidOperationException($"{nameof(IProxyConfig.ChangeToken)} has a null value.");
|
||||
}
|
||||
}
|
||||
|
||||
private void ListenForConfigChanges()
|
||||
{
|
||||
// Use a central change token to avoid overlap between different sources.
|
||||
var source = new CancellationTokenSource();
|
||||
_configChangeSource = source;
|
||||
var poll = false;
|
||||
|
||||
foreach (var configState in _configs)
|
||||
{
|
||||
if (configState.LoadFailed)
|
||||
{
|
||||
// We can't register for change notifications if the last load failed.
|
||||
poll = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
configState.CallbackCleanup?.Dispose();
|
||||
var token = configState.LatestConfig.ChangeToken;
|
||||
if (token.ActiveChangeCallbacks)
|
||||
{
|
||||
configState.CallbackCleanup = token.RegisterChangeCallback(SignalChange, source);
|
||||
}
|
||||
else
|
||||
{
|
||||
poll = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (poll)
|
||||
{
|
||||
source.CancelAfter(TimeSpan.FromMinutes(5));
|
||||
}
|
||||
|
||||
// Don't register until we're done hooking everything up to avoid cancellation races.
|
||||
source.Token.Register(ReloadConfig, this);
|
||||
|
||||
static void SignalChange(object obj)
|
||||
{
|
||||
var token = (CancellationTokenSource)obj;
|
||||
try
|
||||
{
|
||||
token.Cancel();
|
||||
}
|
||||
// Don't throw if the source was already disposed.
|
||||
catch (ObjectDisposedException) { }
|
||||
}
|
||||
|
||||
static void ReloadConfig(object? state)
|
||||
{
|
||||
var manager = (ProxyConfigManager)state!;
|
||||
_ = manager.ReloadConfigAsync();
|
||||
}
|
||||
}
|
||||
|
||||
// Throws for validation failures
|
||||
private async Task<bool> ApplyConfigAsync(IProxyConfig config)
|
||||
private async Task<bool> ApplyConfigAsync(IReadOnlyList<RouteConfig> routes, IReadOnlyList<ClusterConfig> clusters)
|
||||
{
|
||||
var (configuredClusters, clusterErrors) = await VerifyClustersAsync(config.Clusters, cancellation: default);
|
||||
var (configuredRoutes, routeErrors) = await VerifyRoutesAsync(config.Routes, configuredClusters, cancellation: default);
|
||||
var (configuredClusters, clusterErrors) = await VerifyClustersAsync(clusters, cancellation: default);
|
||||
var (configuredRoutes, routeErrors) = await VerifyRoutesAsync(routes, configuredClusters, cancellation: default);
|
||||
|
||||
if (routeErrors.Count > 0 || clusterErrors.Count > 0)
|
||||
{
|
||||
|
@ -539,7 +630,7 @@ internal sealed class ProxyConfigManager : EndpointDataSource, IDisposable
|
|||
// NOTE 1: Remove is safe to do within the `foreach` loop on ConcurrentDictionary
|
||||
//
|
||||
// NOTE 2: Removing the route from _routes is safe and existing
|
||||
// ASP .NET Core endpoints will continue to work with their existing behavior since
|
||||
// ASP.NET Core endpoints will continue to work with their existing behavior since
|
||||
// their copy of `RouteModel` is immutable and remains operational in whichever state is was in.
|
||||
Log.RouteRemoved(_logger, routeId);
|
||||
var removed = _routes.TryRemove(routeId, out var _);
|
||||
|
@ -568,14 +659,14 @@ internal sealed class ProxyConfigManager : EndpointDataSource, IDisposable
|
|||
// These steps are done in a specific order to ensure callers always see a consistent state.
|
||||
|
||||
// Step 1 - capture old token
|
||||
var oldCancellationTokenSource = _cancellationTokenSource;
|
||||
var oldCancellationTokenSource = _endpointsChangeSource;
|
||||
|
||||
// Step 2 - update endpoints
|
||||
Volatile.Write(ref _endpoints, endpoints);
|
||||
|
||||
// Step 3 - create new change token
|
||||
_cancellationTokenSource = new CancellationTokenSource();
|
||||
Volatile.Write(ref _changeToken, new CancellationChangeToken(_cancellationTokenSource.Token));
|
||||
_endpointsChangeSource = new CancellationTokenSource();
|
||||
Volatile.Write(ref _endpointsChangeToken, new CancellationChangeToken(_endpointsChangeSource.Token));
|
||||
|
||||
// Step 4 - trigger old token
|
||||
oldCancellationTokenSource?.Cancel();
|
||||
|
@ -591,7 +682,28 @@ internal sealed class ProxyConfigManager : EndpointDataSource, IDisposable
|
|||
|
||||
public void Dispose()
|
||||
{
|
||||
_changeSubscription?.Dispose();
|
||||
_configChangeSource.Dispose();
|
||||
foreach (var instance in _configs)
|
||||
{
|
||||
instance.CallbackCleanup?.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
private class ConfigState
|
||||
{
|
||||
public ConfigState(IProxyConfigProvider provider, IProxyConfig config)
|
||||
{
|
||||
Provider = provider;
|
||||
LatestConfig = config;
|
||||
}
|
||||
|
||||
public IProxyConfigProvider Provider { get; }
|
||||
|
||||
public IProxyConfig LatestConfig { get; set; }
|
||||
|
||||
public bool LoadFailed { get; set; }
|
||||
|
||||
public IDisposable? CallbackCleanup { get; set; }
|
||||
}
|
||||
|
||||
private static class Log
|
||||
|
@ -644,7 +756,7 @@ internal sealed class ProxyConfigManager : EndpointDataSource, IDisposable
|
|||
private static readonly Action<ILogger, Exception> _errorReloadingConfig = LoggerMessage.Define(
|
||||
LogLevel.Error,
|
||||
EventIds.ErrorReloadingConfig,
|
||||
"Failed to reload config. Unable to listen for future changes.");
|
||||
"Failed to reload config. Unable to register for change notifications, polling for changes until successful.");
|
||||
|
||||
private static readonly Action<ILogger, Exception> _errorApplyingConfig = LoggerMessage.Define(
|
||||
LogLevel.Error,
|
||||
|
|
|
@ -28,7 +28,7 @@ namespace Yarp.ReverseProxy.Management.Tests;
|
|||
|
||||
public class ProxyConfigManagerTests
|
||||
{
|
||||
private IServiceProvider CreateServices(List<RouteConfig> routes, List<ClusterConfig> clusters, Action<IReverseProxyBuilder> configureProxy = null)
|
||||
private static IServiceProvider CreateServices(List<RouteConfig> routes, List<ClusterConfig> clusters, Action<IReverseProxyBuilder> configureProxy = null)
|
||||
{
|
||||
var serviceCollection = new ServiceCollection();
|
||||
serviceCollection.AddLogging();
|
||||
|
@ -45,6 +45,27 @@ public class ProxyConfigManagerTests
|
|||
return services;
|
||||
}
|
||||
|
||||
private static IServiceProvider CreateServices(IEnumerable<IProxyConfigProvider> configProviders, Action<IReverseProxyBuilder> configureProxy = null)
|
||||
{
|
||||
var serviceCollection = new ServiceCollection();
|
||||
serviceCollection.AddLogging();
|
||||
serviceCollection.AddRouting();
|
||||
var proxyBuilder = serviceCollection.AddReverseProxy();
|
||||
foreach (var configProvider in configProviders)
|
||||
{
|
||||
serviceCollection.AddSingleton(configProvider);
|
||||
}
|
||||
serviceCollection.TryAddSingleton(new Mock<IWebHostEnvironment>().Object);
|
||||
var activeHealthPolicy = new Mock<IActiveHealthCheckPolicy>();
|
||||
activeHealthPolicy.SetupGet(p => p.Name).Returns("activePolicyA");
|
||||
serviceCollection.AddSingleton(activeHealthPolicy.Object);
|
||||
configureProxy?.Invoke(proxyBuilder);
|
||||
var services = serviceCollection.BuildServiceProvider();
|
||||
var routeBuilder = services.GetRequiredService<ProxyEndpointFactory>();
|
||||
routeBuilder.SetProxyPipeline(context => Task.CompletedTask);
|
||||
return services;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Constructor_Works()
|
||||
{
|
||||
|
@ -146,6 +167,180 @@ public class ProxyConfigManagerTests
|
|||
Assert.Equal(TestAddress, destination.Model.Config.Address);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task BuildConfig_TwoDistinctConfigs_Works()
|
||||
{
|
||||
const string TestAddress = "https://localhost:123/";
|
||||
|
||||
var cluster1 = new ClusterConfig
|
||||
{
|
||||
ClusterId = "cluster1",
|
||||
Destinations = new Dictionary<string, DestinationConfig>(StringComparer.OrdinalIgnoreCase)
|
||||
{
|
||||
{ "d1", new DestinationConfig { Address = TestAddress } }
|
||||
}
|
||||
};
|
||||
var route1 = new RouteConfig
|
||||
{
|
||||
RouteId = "route1",
|
||||
ClusterId = "cluster1",
|
||||
Match = new RouteMatch { Path = "/" }
|
||||
};
|
||||
|
||||
var cluster2 = new ClusterConfig
|
||||
{
|
||||
ClusterId = "cluster2",
|
||||
Destinations = new Dictionary<string, DestinationConfig>(StringComparer.OrdinalIgnoreCase)
|
||||
{
|
||||
{ "d2", new DestinationConfig { Address = TestAddress } }
|
||||
}
|
||||
};
|
||||
var route2 = new RouteConfig
|
||||
{
|
||||
RouteId = "route2",
|
||||
ClusterId = "cluster2",
|
||||
Match = new RouteMatch { Path = "/" }
|
||||
};
|
||||
|
||||
var config1 = new InMemoryConfigProvider(new List<RouteConfig>() { route1 }, new List<ClusterConfig>() { cluster1 });
|
||||
var config2 = new InMemoryConfigProvider(new List<RouteConfig>() { route2 }, new List<ClusterConfig>() { cluster2 });
|
||||
|
||||
var services = CreateServices(new[] { config1, config2 });
|
||||
|
||||
var manager = services.GetRequiredService<ProxyConfigManager>();
|
||||
var dataSource = await manager.InitialLoadAsync();
|
||||
|
||||
Assert.NotNull(dataSource);
|
||||
var endpoints = dataSource.Endpoints;
|
||||
Assert.Equal(2, endpoints.Count);
|
||||
|
||||
// The order is unstable because routes are stored in a dictionary.
|
||||
var routeConfig = endpoints.Single(e => string.Equals(e.DisplayName, "route1")).Metadata.GetMetadata<RouteModel>();
|
||||
Assert.NotNull(routeConfig);
|
||||
Assert.Equal("route1", routeConfig.Config.RouteId);
|
||||
|
||||
var clusterState = routeConfig.Cluster;
|
||||
Assert.NotNull(clusterState);
|
||||
|
||||
Assert.Equal("cluster1", clusterState.ClusterId);
|
||||
Assert.NotNull(clusterState.Destinations);
|
||||
Assert.NotNull(clusterState.Model);
|
||||
Assert.NotNull(clusterState.Model.HttpClient);
|
||||
Assert.Same(clusterState, routeConfig.Cluster);
|
||||
|
||||
var actualDestinations = clusterState.Destinations.Values;
|
||||
var destination = Assert.Single(actualDestinations);
|
||||
Assert.Equal("d1", destination.DestinationId);
|
||||
Assert.NotNull(destination.Model);
|
||||
Assert.Equal(TestAddress, destination.Model.Config.Address);
|
||||
|
||||
routeConfig = endpoints.Single(e => string.Equals(e.DisplayName, "route2")).Metadata.GetMetadata<RouteModel>();
|
||||
Assert.NotNull(routeConfig);
|
||||
Assert.Equal("route2", routeConfig.Config.RouteId);
|
||||
|
||||
clusterState = routeConfig.Cluster;
|
||||
Assert.NotNull(clusterState);
|
||||
|
||||
Assert.Equal("cluster2", clusterState.ClusterId);
|
||||
Assert.NotNull(clusterState.Destinations);
|
||||
Assert.NotNull(clusterState.Model);
|
||||
Assert.NotNull(clusterState.Model.HttpClient);
|
||||
Assert.Same(clusterState, routeConfig.Cluster);
|
||||
|
||||
actualDestinations = clusterState.Destinations.Values;
|
||||
destination = Assert.Single(actualDestinations);
|
||||
Assert.Equal("d2", destination.DestinationId);
|
||||
Assert.NotNull(destination.Model);
|
||||
Assert.Equal(TestAddress, destination.Model.Config.Address);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task BuildConfig_TwoOverlappingConfigs_Works()
|
||||
{
|
||||
const string TestAddress = "https://localhost:123/";
|
||||
|
||||
var cluster1 = new ClusterConfig
|
||||
{
|
||||
ClusterId = "cluster1",
|
||||
Destinations = new Dictionary<string, DestinationConfig>(StringComparer.OrdinalIgnoreCase)
|
||||
{
|
||||
{ "d1", new DestinationConfig { Address = TestAddress } }
|
||||
}
|
||||
};
|
||||
var cluster2 = new ClusterConfig
|
||||
{
|
||||
ClusterId = "cluster2",
|
||||
Destinations = new Dictionary<string, DestinationConfig>(StringComparer.OrdinalIgnoreCase)
|
||||
{
|
||||
{ "d2", new DestinationConfig { Address = TestAddress } }
|
||||
}
|
||||
};
|
||||
|
||||
var route1 = new RouteConfig
|
||||
{
|
||||
RouteId = "route1",
|
||||
ClusterId = "cluster1",
|
||||
Match = new RouteMatch { Path = "/" }
|
||||
};
|
||||
var route2 = new RouteConfig
|
||||
{
|
||||
RouteId = "route2",
|
||||
ClusterId = "cluster2",
|
||||
Match = new RouteMatch { Path = "/" }
|
||||
};
|
||||
|
||||
var config1 = new InMemoryConfigProvider(new List<RouteConfig>() { route2 }, new List<ClusterConfig>() { cluster1 });
|
||||
var config2 = new InMemoryConfigProvider(new List<RouteConfig>() { route1 }, new List<ClusterConfig>() { cluster2 });
|
||||
|
||||
var services = CreateServices(new[] { config1, config2 });
|
||||
|
||||
var manager = services.GetRequiredService<ProxyConfigManager>();
|
||||
var dataSource = await manager.InitialLoadAsync();
|
||||
|
||||
Assert.NotNull(dataSource);
|
||||
var endpoints = dataSource.Endpoints;
|
||||
Assert.Equal(2, endpoints.Count);
|
||||
|
||||
// The order is unstable because routes are stored in a dictionary.
|
||||
var routeConfig = endpoints.Single(e => string.Equals(e.DisplayName, "route1")).Metadata.GetMetadata<RouteModel>();
|
||||
Assert.NotNull(routeConfig);
|
||||
Assert.Equal("route1", routeConfig.Config.RouteId);
|
||||
|
||||
var clusterState = routeConfig.Cluster;
|
||||
Assert.NotNull(clusterState);
|
||||
|
||||
Assert.Equal("cluster1", clusterState.ClusterId);
|
||||
Assert.NotNull(clusterState.Destinations);
|
||||
Assert.NotNull(clusterState.Model);
|
||||
Assert.NotNull(clusterState.Model.HttpClient);
|
||||
Assert.Same(clusterState, routeConfig.Cluster);
|
||||
|
||||
var actualDestinations = clusterState.Destinations.Values;
|
||||
var destination = Assert.Single(actualDestinations);
|
||||
Assert.Equal("d1", destination.DestinationId);
|
||||
Assert.NotNull(destination.Model);
|
||||
Assert.Equal(TestAddress, destination.Model.Config.Address);
|
||||
|
||||
routeConfig = endpoints.Single(e => string.Equals(e.DisplayName, "route2")).Metadata.GetMetadata<RouteModel>();
|
||||
Assert.NotNull(routeConfig);
|
||||
Assert.Equal("route2", routeConfig.Config.RouteId);
|
||||
|
||||
clusterState = routeConfig.Cluster;
|
||||
Assert.NotNull(clusterState);
|
||||
|
||||
Assert.Equal("cluster2", clusterState.ClusterId);
|
||||
Assert.NotNull(clusterState.Destinations);
|
||||
Assert.NotNull(clusterState.Model);
|
||||
Assert.NotNull(clusterState.Model.HttpClient);
|
||||
Assert.Same(clusterState, routeConfig.Cluster);
|
||||
|
||||
actualDestinations = clusterState.Destinations.Values;
|
||||
destination = Assert.Single(actualDestinations);
|
||||
Assert.Equal("d2", destination.DestinationId);
|
||||
Assert.NotNull(destination.Model);
|
||||
Assert.Equal(TestAddress, destination.Model.Config.Address);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task InitialLoadAsync_ProxyHttpClientOptionsSet_CreateAndSetHttpClient()
|
||||
{
|
||||
|
@ -250,6 +445,57 @@ public class ProxyConfigManagerTests
|
|||
Assert.NotNull(readEndpoints2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task GetChangeToken_MultipleConfigs_SignalsChange()
|
||||
{
|
||||
var config1 = new InMemoryConfigProvider(new List<RouteConfig>(), new List<ClusterConfig>());
|
||||
var config2 = new InMemoryConfigProvider(new List<RouteConfig>(), new List<ClusterConfig>());
|
||||
var services = CreateServices(new[] { config1, config2 });
|
||||
var configManager = services.GetRequiredService<ProxyConfigManager>();
|
||||
var dataSource = await configManager.InitialLoadAsync();
|
||||
_ = configManager.Endpoints; // Lazily creates endpoints the first time, activates change notifications.
|
||||
|
||||
var signaled1 = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
var signaled2 = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
IReadOnlyList<Endpoint> readEndpoints1 = null;
|
||||
IReadOnlyList<Endpoint> readEndpoints2 = null;
|
||||
|
||||
var changeToken1 = dataSource.GetChangeToken();
|
||||
changeToken1.RegisterChangeCallback(
|
||||
_ =>
|
||||
{
|
||||
readEndpoints1 = dataSource.Endpoints;
|
||||
signaled1.SetResult(1);
|
||||
}, null);
|
||||
|
||||
// updating should signal the current change token
|
||||
Assert.False(signaled1.Task.IsCompleted);
|
||||
config1.Update(new List<RouteConfig>() { new RouteConfig() { RouteId = "r1", Match = new RouteMatch { Path = "/" } } }, new List<ClusterConfig>());
|
||||
await signaled1.Task.DefaultTimeout();
|
||||
|
||||
var changeToken2 = dataSource.GetChangeToken();
|
||||
changeToken2.RegisterChangeCallback(
|
||||
_ =>
|
||||
{
|
||||
readEndpoints2 = dataSource.Endpoints;
|
||||
signaled2.SetResult(1);
|
||||
}, null);
|
||||
|
||||
// updating again should only signal the new change token
|
||||
Assert.False(signaled2.Task.IsCompleted);
|
||||
config2.Update(new List<RouteConfig>() { new RouteConfig() { RouteId = "r2", Match = new RouteMatch { Path = "/" } } }, new List<ClusterConfig>());
|
||||
await signaled2.Task.DefaultTimeout();
|
||||
|
||||
var endpoint = Assert.Single(readEndpoints1);
|
||||
Assert.Equal("r1", endpoint.DisplayName);
|
||||
|
||||
Assert.NotNull(readEndpoints2);
|
||||
Assert.Equal(2, readEndpoints2.Count);
|
||||
// Ordering is unstable due to dictionary storage.
|
||||
readEndpoints2.Single(e => string.Equals(e.DisplayName, "r1"));
|
||||
readEndpoints2.Single(e => string.Equals(e.DisplayName, "r2"));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ChangeConfig_ActiveHealthCheckIsEnabled_RunInitialCheck()
|
||||
{
|
||||
|
@ -336,6 +582,27 @@ public class ProxyConfigManagerTests
|
|||
Assert.StartsWith($"Route '{routeName}' requires Hosts or Path specified", argex.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task LoadAsync_MultipleSourcesWithValidationErrors_Throws()
|
||||
{
|
||||
var route1 = new RouteConfig { RouteId = "route1", Match = new RouteMatch { Hosts = null }, ClusterId = "cluster1" };
|
||||
var provider1 = new InMemoryConfigProvider(new List<RouteConfig>() { route1 }, new List<ClusterConfig>());
|
||||
var cluster1 = new ClusterConfig { ClusterId = "cluster1", HttpClient = new HttpClientConfig { MaxConnectionsPerServer = -1 } };
|
||||
var provider2 = new InMemoryConfigProvider(new List<RouteConfig>(), new List<ClusterConfig>() { cluster1 });
|
||||
var services = CreateServices(new[] { provider1, provider2 });
|
||||
var configManager = services.GetRequiredService<ProxyConfigManager>();
|
||||
|
||||
var ioEx = await Assert.ThrowsAsync<InvalidOperationException>(() => configManager.InitialLoadAsync());
|
||||
Assert.Equal("Unable to load or apply the proxy configuration.", ioEx.Message);
|
||||
var agex = Assert.IsType<AggregateException>(ioEx.InnerException);
|
||||
|
||||
Assert.Equal(2, agex.InnerExceptions.Count);
|
||||
var argex = Assert.IsType<ArgumentException>(agex.InnerExceptions.First());
|
||||
Assert.StartsWith($"Route 'route1' requires Hosts or Path specified", argex.Message);
|
||||
argex = Assert.IsType<ArgumentException>(agex.InnerExceptions.Skip(1).First());
|
||||
Assert.StartsWith($"Max connections per server limit set on the cluster 'cluster1' must be positive.", argex.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task LoadAsync_ConfigFilterRouteActions_CanFixBrokenRoute()
|
||||
{
|
||||
|
|
|
@ -31,6 +31,7 @@ public class Startup
|
|||
services.AddControllers();
|
||||
services.AddReverseProxy()
|
||||
.LoadFromConfig(_configuration.GetSection("ReverseProxy"))
|
||||
.LoadFromConfig(_configuration.GetSection("ReverseProxy2"))
|
||||
.AddConfigFilter<CustomConfigFilter>();
|
||||
}
|
||||
|
||||
|
|
|
@ -120,5 +120,53 @@
|
|||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"ReverseProxy2": {
|
||||
"Clusters": {
|
||||
"cluster3": {
|
||||
"LoadBalancingPolicy": "Random",
|
||||
"SessionAffinity": {
|
||||
"Enabled": "true",
|
||||
"Policy": "Cookie",
|
||||
"AffinityKeyName": ".Yarp.Affinity"
|
||||
},
|
||||
"HealthCheck": {
|
||||
"Active": {
|
||||
"Enabled": "true",
|
||||
"Interval": "00:00:10",
|
||||
"Timeout": "00:00:10",
|
||||
"Policy": "ConsecutiveFailures",
|
||||
"Path": "/api/health"
|
||||
},
|
||||
"Passive": {
|
||||
"Enabled": "true",
|
||||
"Policy": "TransportFailureRate",
|
||||
"ReactivationPeriod": "00:05:00"
|
||||
}
|
||||
},
|
||||
"Metadata": {
|
||||
"ConsecutiveFailuresHealthPolicy.Threshold": "3",
|
||||
"TransportFailureRateHealthPolicy.RateLimit": "0.5"
|
||||
},
|
||||
"Destinations": {
|
||||
"cluster1/destination1": {
|
||||
"Address": "https://localhost:10000/"
|
||||
},
|
||||
"cluster1/destination2": {
|
||||
"Address": "http://localhost:10010/"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"Routes": {
|
||||
"route3": {
|
||||
"ClusterId": "cluster3",
|
||||
"Match": {
|
||||
"Methods": [ "GET", "POST" ],
|
||||
"Hosts": [ "localhost" ],
|
||||
"Path": "/api2/{action}"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче