Upgrade Resiliency: Adds Implementation for Validating the Unhealthy Backend Replicas in Direct mode (#3631)

* Code changes to implement replica validation in dotnet v3 sdk.

* Cosmetic changes to add inline code comments.

* Code chages to address review comments.

* Code changes to cover a scenario for async cache.

* Code changes to refactor async non-blocking cache code.

* Code changes to address minor review comments.

---------

Co-authored-by: Kiran Kumar Kolli <kirankk@microsoft.com>
This commit is contained in:
Debdatta Kunda 2023-03-01 08:02:15 -08:00 коммит произвёл GitHub
Родитель 0088c2f8e4
Коммит fd687f55dd
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 1180 добавлений и 26 удалений

Просмотреть файл

@ -120,30 +120,11 @@ namespace Microsoft.Azure.Cosmos
throw;
}
try
{
return await initialLazyValue.CreateAndWaitForBackgroundRefreshTaskAsync(
createRefreshTask: singleValueInitFunc);
}
catch (Exception e)
{
if (initialLazyValue.ShouldRemoveFromCacheThreadSafe())
{
DefaultTrace.TraceError(
"AsyncCacheNonBlocking.GetAsync with ForceRefresh Failed. key: {0}, Exception: {1}",
key,
e);
// In some scenarios when a background failure occurs like a 404
// the initial cache value should be removed.
if (this.removeFromCacheOnBackgroundRefreshException(e))
{
this.TryRemove(key);
}
}
throw;
}
return await this.UpdateCacheAndGetValueFromBackgroundTaskAsync(
key: key,
initialValue: initialLazyValue,
callbackDelegate: singleValueInitFunc,
operationName: nameof(GetAsync));
}
// The AsyncLazyWithRefreshTask is lazy and won't create the task until GetValue is called.
@ -196,6 +177,70 @@ namespace Microsoft.Azure.Cosmos
return this.values.TryRemove(key, out _);
}
/// <summary>
/// Refreshes the async non blocking cache on-demand for the given <paramref name="key"/>
/// and caches the result for later usage.
/// </summary>
/// <param name="key">The requested key to be refreshed.</param>
/// <param name="singleValueInitFunc">A func delegate to be invoked at a later point of time.</param>
public async Task RefreshAsync(
TKey key,
Func<TValue, Task<TValue>> singleValueInitFunc)
{
if (this.values.TryGetValue(key, out AsyncLazyWithRefreshTask<TValue> initialLazyValue))
{
await this.UpdateCacheAndGetValueFromBackgroundTaskAsync(
key: key,
initialValue: initialLazyValue,
callbackDelegate: singleValueInitFunc,
operationName: nameof(RefreshAsync));
}
}
/// <summary>
/// Creates a background task to invoke the callback delegate and updates the cache with the value returned from the delegate.
/// </summary>
/// <param name="key">The requested key to be updated.</param>
/// <param name="initialValue">An instance of <see cref="AsyncLazyWithRefreshTask{T}"/> containing the initial cached value.</param>
/// <param name="callbackDelegate">A func callback delegate to be invoked at a later point of time.</param>
/// <param name="operationName">A string indicating the operation on the cache.</param>
/// <returns>A <see cref="Task{TValue}"/> containing the updated, refreshed value.</returns>
private async Task<TValue> UpdateCacheAndGetValueFromBackgroundTaskAsync(
TKey key,
AsyncLazyWithRefreshTask<TValue> initialValue,
Func<TValue, Task<TValue>> callbackDelegate,
string operationName)
{
try
{
return await initialValue.CreateAndWaitForBackgroundRefreshTaskAsync(
createRefreshTask: callbackDelegate);
}
catch (Exception ex)
{
if (initialValue.ShouldRemoveFromCacheThreadSafe())
{
bool removed = false;
// In some scenarios when a background failure occurs like a 404
// the initial cache value should be removed.
if (this.removeFromCacheOnBackgroundRefreshException(ex))
{
removed = this.TryRemove(key);
}
DefaultTrace.TraceError(
"AsyncCacheNonBlocking Failed. key: {0}, operation: {1}, tryRemoved: {2}, Exception: {3}",
key,
operationName,
removed,
ex);
}
throw;
}
}
/// <summary>
/// This is AsyncLazy that has an additional Task that can
/// be used to update the value. This allows concurrent requests

Просмотреть файл

@ -46,6 +46,7 @@ namespace Microsoft.Azure.Cosmos.Routing
private readonly bool enableTcpConnectionEndpointRediscovery;
private readonly CosmosHttpClient httpClient;
private readonly bool isReplicaAddressValidationEnabled;
private Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> masterPartitionAddressCache;
private DateTime suboptimalMasterPartitionTimestamp;
@ -84,10 +85,27 @@ namespace Microsoft.Azure.Cosmos.Routing
GatewayAddressCache.ProtocolString(this.protocol));
this.openConnectionsHandler = openConnectionsHandler;
this.isReplicaAddressValidationEnabled = Helpers.GetEnvironmentVariableAsBool(
name: Constants.EnvironmentVariables.ReplicaConnectivityValidationEnabled,
defaultValue: false);
}
public Uri ServiceEndpoint => this.serviceEndpoint;
/// <summary>
/// Gets the address information from the gateway and sets them into the async non blocking cache for later lookup.
/// Additionally attempts to establish Rntbd connections to the backend replicas based on `shouldOpenRntbdChannels`
/// boolean flag.
/// </summary>
/// <param name="databaseName">A string containing the database name.</param>
/// <param name="collection">An instance of <see cref="ContainerProperties"/> containing the collection properties.</param>
/// <param name="partitionKeyRangeIdentities">A read only list containing the partition key range identities.</param>
/// <param name="shouldOpenRntbdChannels">A boolean flag indicating whether Rntbd connections are required to be established
/// to the backend replica nodes. For cosmos client initialization and cache warmups, the Rntbd connection are needed to be
/// openned deterministically to the backend replicas to reduce latency, thus the <paramref name="shouldOpenRntbdChannels"/>
/// should be set to `true` during cosmos client initialization and cache warmups. The OpenAsync flow from DocumentClient
/// doesn't require the connections to be opened deterministically thus should set the parameter to `false`.</param>
/// <param name="cancellationToken">An instance of <see cref="CancellationToken"/>.</param>
public async Task OpenConnectionsAsync(
string databaseName,
ContainerProperties collection,
@ -161,6 +179,10 @@ namespace Microsoft.Azure.Cosmos.Routing
new PartitionKeyRangeIdentity(collection.ResourceId, addressInfo.Item1.PartitionKeyRangeId),
addressInfo.Item2);
// The `shouldOpenRntbdChannels` boolean flag indicates whether the SDK should establish Rntbd connections to the
// backend replica nodes. For the `CosmosClient.CreateAndInitializeAsync()` flow, the flag should be passed as
// `true` so that the Rntbd connections to the backend replicas could be established deterministically. For any
// other flow, the flag should be passed as `false`.
if (this.openConnectionsHandler != null && shouldOpenRntbdChannels)
{
await this.openConnectionsHandler
@ -178,6 +200,7 @@ namespace Microsoft.Azure.Cosmos.Routing
this.openConnectionsHandler = openConnectionsHandler;
}
/// <inheritdoc/>
public async Task<PartitionAddressInformation> TryGetAddressesAsync(
DocumentServiceRequest request,
PartitionKeyRangeIdentity partitionKeyRangeIdentity,
@ -229,6 +252,7 @@ namespace Microsoft.Azure.Cosmos.Routing
return this.GetAddressesForRangeIdAsync(
request,
cachedAddresses: currentCachedValue,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: forceRefreshPartitionAddresses);
@ -259,6 +283,7 @@ namespace Microsoft.Azure.Cosmos.Routing
key: partitionKeyRangeIdentity,
singleValueInitFunc: (_) => this.GetAddressesForRangeIdAsync(
request,
cachedAddresses: null,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: false),
@ -278,6 +303,27 @@ namespace Microsoft.Azure.Cosmos.Routing
this.suboptimalServerPartitionTimestamps.TryAdd(partitionKeyRangeIdentity, DateTime.UtcNow);
}
// Refresh the cache on-demand, if there were some address that remained as unhealthy long enough (more than 1 minute)
// and need to revalidate its status. The reason it is not dependent on 410 to force refresh the addresses, is being:
// When an address is marked as unhealthy, then the address enumerator will deprioritize it and move it back to the
// end of the transport uris list. Therefore, it could happen that no request will land on the unhealthy address for
// an extended period of time therefore, the chances of 410 (Gone Exception) to trigger the forceRefresh workflow may
// not happen for that particular replica.
if (addresses
.Get(Protocol.Tcp)
.ReplicaTransportAddressUris
.Any(x => x.ShouldRefreshHealthStatus()))
{
Task refreshAddressesInBackgroundTask = Task.Run(async () => await this.serverPartitionAddressCache.RefreshAsync(
key: partitionKeyRangeIdentity,
singleValueInitFunc: (currentCachedValue) => this.GetAddressesForRangeIdAsync(
request,
cachedAddresses: currentCachedValue,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: true)));
}
return addresses;
}
catch (DocumentClientException ex)
@ -384,6 +430,7 @@ namespace Microsoft.Azure.Cosmos.Routing
key: partitionKeyRangeIdentity,
singleValueInitFunc: (_) => this.GetAddressesForRangeIdAsync(
null,
cachedAddresses: null,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: true),
@ -444,6 +491,7 @@ namespace Microsoft.Azure.Cosmos.Routing
private async Task<PartitionAddressInformation> GetAddressesForRangeIdAsync(
DocumentServiceRequest request,
PartitionAddressInformation cachedAddresses,
string collectionRid,
string partitionKeyRangeId,
bool forceRefresh)
@ -475,6 +523,32 @@ namespace Microsoft.Azure.Cosmos.Routing
throw new PartitionKeyRangeGoneException(errorMessage) { ResourceAddress = collectionRid };
}
if (this.isReplicaAddressValidationEnabled)
{
// The purpose of this step is to merge the new transport addresses with the old one. What this means is -
// 1. If a newly returned address from gateway is already a part of the cache, then restore the health state
// of the new address with that of the cached one.
// 2. If a newly returned address from gateway doesn't exist in the cache, then keep using the new address
// with `Unknown` (initial) status.
PartitionAddressInformation mergedAddresses = GatewayAddressCache.MergeAddresses(result.Item2, cachedAddresses);
IReadOnlyList<TransportAddressUri> transportAddressUris = mergedAddresses.Get(Protocol.Tcp)?.ReplicaTransportAddressUris;
// If cachedAddresses are null, that would mean that the returned address from gateway would remain in Unknown
// status and there is no cached state that could transition them into Unhealthy.
if (cachedAddresses != null)
{
foreach (TransportAddressUri address in transportAddressUris)
{
// The main purpose for this step is to move address health status from Unhealthy to UnhealthyPending.
address.SetRefreshedIfUnhealthy();
}
}
this.ValidateUnhealthyPendingReplicas(transportAddressUris);
return mergedAddresses;
}
return result.Item2;
}
}
@ -760,6 +834,86 @@ namespace Microsoft.Azure.Cosmos.Routing
}
}
/// <summary>
/// Validates the unhealthy pending replicas by attempting to open the Rntbd connection. This operation
/// will eventually marks the unhealthy pending replicas to healthy, if the rntbd connection attempt made was
/// successful or unhealthy otherwise.
/// </summary>
/// <param name="addresses">A read-only list of <see cref="TransportAddressUri"/> needs to be validated.</param>
private void ValidateUnhealthyPendingReplicas(
IReadOnlyList<TransportAddressUri> addresses)
{
if (addresses == null)
{
throw new ArgumentNullException(nameof(addresses));
}
IEnumerable<TransportAddressUri> addressesNeedToValidation = addresses
.Where(address => address
.GetCurrentHealthState()
.GetHealthStatus() == TransportAddressHealthState.HealthStatus.UnhealthyPending);
if (addressesNeedToValidation.Any())
{
Task openConnectionsInBackgroundTask = Task.Run(async () => await this.openConnectionsHandler.TryOpenRntbdChannelsAsync(
addresses: addressesNeedToValidation.ToList()));
}
}
/// <summary>
/// Merge the new addresses returned from gateway service with that of the cached addresses. If the returned
/// new addresses list contains some of the addresses, which are already cached, then reset the health state
/// of the new address to that of the cached one. If the the new addresses doesn't contain any of the cached
/// addresses, then keep using the health state of the new addresses, which should be `unknown`.
/// </summary>
/// <param name="newAddresses">A list of <see cref="PartitionAddressInformation"/> containing the latest
/// addresses being returned from gateway.</param>
/// <param name="cachedAddresses">A list of <see cref="PartitionAddressInformation"/> containing the cached
/// addresses from the async non blocking cache.</param>
/// <returns>A list of <see cref="PartitionAddressInformation"/> containing the merged addresses.</returns>
private static PartitionAddressInformation MergeAddresses(
PartitionAddressInformation newAddresses,
PartitionAddressInformation cachedAddresses)
{
if (newAddresses == null)
{
throw new ArgumentNullException(nameof(newAddresses));
}
if (cachedAddresses == null)
{
return newAddresses;
}
PerProtocolPartitionAddressInformation currentAddressInfo = newAddresses.Get(Protocol.Tcp);
PerProtocolPartitionAddressInformation cachedAddressInfo = cachedAddresses.Get(Protocol.Tcp);
Dictionary<string, TransportAddressUri> cachedAddressDict = new ();
foreach (TransportAddressUri transportAddressUri in cachedAddressInfo.ReplicaTransportAddressUris)
{
cachedAddressDict[transportAddressUri.ToString()] = transportAddressUri;
}
foreach (TransportAddressUri transportAddressUri in currentAddressInfo.ReplicaTransportAddressUris)
{
if (cachedAddressDict.ContainsKey(transportAddressUri.ToString()))
{
TransportAddressUri cachedTransportAddressUri = cachedAddressDict[transportAddressUri.ToString()];
transportAddressUri.ResetHealthStatus(
status: cachedTransportAddressUri.GetCurrentHealthState().GetHealthStatus(),
lastUnknownTimestamp: cachedTransportAddressUri.GetCurrentHealthState().GetLastKnownTimestampByHealthStatus(
healthStatus: TransportAddressHealthState.HealthStatus.Unknown),
lastUnhealthyPendingTimestamp: cachedTransportAddressUri.GetCurrentHealthState().GetLastKnownTimestampByHealthStatus(
healthStatus: TransportAddressHealthState.HealthStatus.UnhealthyPending),
lastUnhealthyTimestamp: cachedTransportAddressUri.GetCurrentHealthState().GetLastKnownTimestampByHealthStatus(
healthStatus: TransportAddressHealthState.HealthStatus.Unhealthy));
}
}
return newAddresses;
}
protected virtual void Dispose(bool disposing)
{
if (this.disposedValue)

Просмотреть файл

@ -6,16 +6,15 @@ namespace Microsoft.Azure.Cosmos
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Collections.Specialized;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Serialization.HybridRow;
using Microsoft.Azure.Cosmos.Tests;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
@ -872,6 +871,436 @@ namespace Microsoft.Azure.Cosmos
expectedSuccessCount: addresses.Count);
}
/// <summary>
/// Test to validate that when replica validation is enabled and force address refresh happens to fetch the latest address from gateway,
/// if in case the gateway returns the same address which was previously unhealthy, the gateway address cache resets the returned status
/// to unhealthy and validates that replica using the open connection handler and finally marks it to connected.
/// </summary>
[TestMethod]
[Owner("dkunda")]
public async Task TryGetAddressesAsync_WhenReplicaVlidationEnabled_ShouldValidateUnhealthyReplicasHealth()
{
// Arrange.
ManualResetEvent manualResetEvent = new(initialState: false);
Mock<IHttpHandler> mockHttpHandler = new (MockBehavior.Strict);
string oldAddress = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/4s";
string newAddress = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/5s";
string addressTobeMarkedUnhealthy = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/2s";
mockHttpHandler.SetupSequence(x => x.SendAsync(
It.IsAny<HttpRequestMessage>(),
It.IsAny<CancellationToken>()))
.Returns(MockCosmosUtil.CreateHttpResponseOfAddresses(new List<string>()
{
"rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/1p",
addressTobeMarkedUnhealthy,
"rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/3s",
oldAddress,
}))
.Returns(MockCosmosUtil.CreateHttpResponseOfAddresses(new List<string>()
{
"rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/1p",
addressTobeMarkedUnhealthy,
"rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/3s",
newAddress,
}));
FakeOpenConnectionHandler fakeOpenConnectionHandler = new (
failingIndexes: new HashSet<int>(),
manualResetEvent: manualResetEvent);
HttpClient httpClient = new (new HttpHandlerHelper(mockHttpHandler.Object));
GatewayAddressCache cache = new (
new Uri(GatewayAddressCacheTests.DatabaseAccountApiEndpoint),
Documents.Client.Protocol.Tcp,
this.mockTokenProvider.Object,
this.mockServiceConfigReader.Object,
MockCosmosUtil.CreateCosmosHttpClient(() => httpClient),
openConnectionsHandler: fakeOpenConnectionHandler,
suboptimalPartitionForceRefreshIntervalInSeconds: 2,
enableTcpConnectionEndpointRediscovery: true);
// By default, the replica validation feature is disabled in GatewayAddressCache. Reflection is used to enable the feature
// for the purpose of this test.
FieldInfo fieldInfo = cache
.GetType()
.GetField(
name: "isReplicaAddressValidationEnabled",
bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic);
fieldInfo.SetValue(
obj: cache,
value: true);
DocumentServiceRequest request = DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid);
// Act and Assert.
PartitionAddressInformation addressInfo = await cache.TryGetAddressesAsync(
request: request,
partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity,
serviceIdentity: this.serviceIdentity,
forceRefreshPartitionAddresses: false,
cancellationToken: CancellationToken.None);
TransportAddressUri refreshedUri = addressInfo
.Get(Documents.Client.Protocol.Tcp)?
.ReplicaTransportAddressUris
.Single(x => x.ToString().Equals(addressTobeMarkedUnhealthy));
Assert.IsNotNull(refreshedUri);
Assert.AreEqual(
expected: TransportAddressHealthState.HealthStatus.Unknown,
actual: refreshedUri.GetCurrentHealthState().GetHealthStatus());
Assert.AreEqual(4, addressInfo.AllAddresses.Count);
Assert.AreEqual(1, addressInfo.AllAddresses.Count(x => x.PhysicalUri == oldAddress));
Assert.AreEqual(0, addressInfo.AllAddresses.Count(x => x.PhysicalUri == newAddress));
// Because force refresh is requested, an unhealthy replica is added to the failed endpoint so that it's status could be validted.
request.RequestContext.FailedEndpoints.Value.Add(
new TransportAddressUri(
addressUri: new Uri(
uriString: addressTobeMarkedUnhealthy)));
addressInfo = await cache.TryGetAddressesAsync(
request: request,
partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity,
serviceIdentity: this.serviceIdentity,
forceRefreshPartitionAddresses: true,
cancellationToken: CancellationToken.None);
Assert.AreEqual(4, addressInfo.AllAddresses.Count);
Assert.AreEqual(0, addressInfo.AllAddresses.Count(x => x.PhysicalUri == oldAddress));
Assert.AreEqual(1, addressInfo.AllAddresses.Count(x => x.PhysicalUri == newAddress));
// Waits until a completion signal from the background task is received.
GatewayAddressCacheTests.WaitForManualResetEventSignal(
manualResetEvent: manualResetEvent,
shouldReset: false);
refreshedUri = addressInfo
.Get(Documents.Client.Protocol.Tcp)?
.ReplicaTransportAddressUris
.Single(x => x.ToString().Equals(addressTobeMarkedUnhealthy));
Assert.IsNotNull(refreshedUri);
Assert.AreEqual(
expected: TransportAddressHealthState.HealthStatus.Connected,
actual: refreshedUri.GetCurrentHealthState().GetHealthStatus());
mockHttpHandler.VerifyAll();
GatewayAddressCacheTests.AssertOpenConnectionHandlerAttributes(
fakeOpenConnectionHandler: fakeOpenConnectionHandler,
expectedExceptionCount: 0,
expectedMethodInvocationCount: 1,
expectedReceivedAddressesCount: 1,
expectedSuccessCount: 1);
}
/// <summary>
/// Test to validate that when replica validation is enabled and there exists a replica such that it remained unhealthy for a period of one minute or
/// more, then even though a force refresh is not requested, the unhealthy replicas at least get a chance to re-validate it's status by the
/// on-demand async non-blocking cache refresh flow and eventually marks itself as healthy once the open connection attempt is successful.
/// </summary>
[TestMethod]
[Owner("dkunda")]
public async Task TryGetAddressesAsync_WhenReplicaVlidationEnabledAndUnhealthyUriExistsForOneMinute_ShouldForceRefreshUnhealthyReplicas()
{
// Arrange.
ManualResetEvent manualResetEvent = new (initialState: false);
Mock<IHttpHandler> mockHttpHandler = new(MockBehavior.Strict);
string oldAddress = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/4s";
string newAddress = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/5s";
string addressTobeMarkedUnhealthy = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/2s";
mockHttpHandler.SetupSequence(x => x.SendAsync(
It.IsAny<HttpRequestMessage>(),
It.IsAny<CancellationToken>()))
.Returns(MockCosmosUtil.CreateHttpResponseOfAddresses(new List<string>()
{
"rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/1p",
addressTobeMarkedUnhealthy,
"rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/3s",
oldAddress,
}))
.Returns(MockCosmosUtil.CreateHttpResponseOfAddresses(new List<string>()
{
"rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/1p",
addressTobeMarkedUnhealthy,
"rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/3s",
newAddress,
}))
.Returns(MockCosmosUtil.CreateHttpResponseOfAddresses(new List<string>()
{
"rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/1p",
addressTobeMarkedUnhealthy,
"rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/3s",
newAddress,
}));
FakeOpenConnectionHandler fakeOpenConnectionHandler = new (
failIndexesByAttempts: new Dictionary<int, HashSet<int>>()
{
{ 0, new HashSet<int>() { 0 } }
},
manualResetEvent: manualResetEvent);
HttpClient httpClient = new (new HttpHandlerHelper(mockHttpHandler.Object));
GatewayAddressCache cache = new (
new Uri(GatewayAddressCacheTests.DatabaseAccountApiEndpoint),
Documents.Client.Protocol.Tcp,
this.mockTokenProvider.Object,
this.mockServiceConfigReader.Object,
MockCosmosUtil.CreateCosmosHttpClient(() => httpClient),
openConnectionsHandler: fakeOpenConnectionHandler,
suboptimalPartitionForceRefreshIntervalInSeconds: 2,
enableTcpConnectionEndpointRediscovery: true);
// By default, the replica validation feature is disabled in GatewayAddressCache. Reflection is used to enable the feature
// for the purpose of this test.
FieldInfo fieldInfo = cache
.GetType()
.GetField(
name: "isReplicaAddressValidationEnabled",
bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic);
fieldInfo.SetValue(
obj: cache,
value: true);
DocumentServiceRequest request = DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid);
// Act and Assert.
PartitionAddressInformation addressInfo = await cache.TryGetAddressesAsync(
request: request,
partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity,
serviceIdentity: this.serviceIdentity,
forceRefreshPartitionAddresses: false,
cancellationToken: CancellationToken.None);
TransportAddressUri refreshedUri = addressInfo
.Get(Documents.Client.Protocol.Tcp)?
.ReplicaTransportAddressUris
.Single(x => x.ToString().Equals(addressTobeMarkedUnhealthy));
Assert.IsNotNull(refreshedUri);
Assert.AreEqual(
expected: TransportAddressHealthState.HealthStatus.Unknown,
actual: refreshedUri.GetCurrentHealthState().GetHealthStatus());
Assert.AreEqual(4, addressInfo.AllAddresses.Count);
Assert.AreEqual(1, addressInfo.AllAddresses.Count(x => x.PhysicalUri == oldAddress));
Assert.AreEqual(0, addressInfo.AllAddresses.Count(x => x.PhysicalUri == newAddress));
// Because force refresh is requested, an unhealthy replica is added to the failed endpoint so that it's health status could be validted.
request.RequestContext.FailedEndpoints.Value.Add(
new TransportAddressUri(
addressUri: new Uri(
uriString: addressTobeMarkedUnhealthy)));
addressInfo = await cache.TryGetAddressesAsync(
request: request,
partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity,
serviceIdentity: this.serviceIdentity,
forceRefreshPartitionAddresses: true,
cancellationToken: CancellationToken.None);
Assert.AreEqual(4, addressInfo.AllAddresses.Count);
Assert.AreEqual(0, addressInfo.AllAddresses.Count(x => x.PhysicalUri == oldAddress));
Assert.AreEqual(1, addressInfo.AllAddresses.Count(x => x.PhysicalUri == newAddress));
// Waits until a completion signal from the background task is received.
GatewayAddressCacheTests.WaitForManualResetEventSignal(
manualResetEvent: manualResetEvent,
shouldReset: true);
// During the replica validation flow, the connection attempt was not successful thus the replica
// was marked unhealthy.
refreshedUri = addressInfo
.Get(Documents.Client.Protocol.Tcp)?
.ReplicaTransportAddressUris
.Single(x => x.ToString().Equals(addressTobeMarkedUnhealthy));
Assert.IsNotNull(refreshedUri);
Assert.AreEqual(
expected: TransportAddressHealthState.HealthStatus.Unhealthy,
actual: refreshedUri.GetCurrentHealthState().GetHealthStatus());
GatewayAddressCacheTests.AssertOpenConnectionHandlerAttributes(
fakeOpenConnectionHandler: fakeOpenConnectionHandler,
expectedExceptionCount: 1,
expectedMethodInvocationCount: 1,
expectedReceivedAddressesCount: 1,
expectedSuccessCount: 0);
// A delay of 2 minute was added to make the replica unhealthy for more than one minute. This
// will make sure the unhealthy replica gets a chance to re-validate it's health status.
ReflectionUtils.AddMinuteToDateTimeFieldUsingReflection(
objectName: refreshedUri.GetCurrentHealthState(),
fieldName: "lastUnhealthyTimestamp",
delayInMinutes: -2);
addressInfo = await cache.TryGetAddressesAsync(
request: request,
partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity,
serviceIdentity: this.serviceIdentity,
forceRefreshPartitionAddresses: false,
cancellationToken: CancellationToken.None);
// Waits until a completion signal from the background task is received.
GatewayAddressCacheTests.WaitForManualResetEventSignal(
manualResetEvent: manualResetEvent,
shouldReset: false);
Assert.AreEqual(4, addressInfo.AllAddresses.Count);
GatewayAddressCacheTests.AssertOpenConnectionHandlerAttributes(
fakeOpenConnectionHandler: fakeOpenConnectionHandler,
expectedExceptionCount: 1,
expectedMethodInvocationCount: 2,
expectedReceivedAddressesCount: 1,
expectedSuccessCount: 1);
addressInfo = await cache.TryGetAddressesAsync(
request: request,
partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity,
serviceIdentity: this.serviceIdentity,
forceRefreshPartitionAddresses: false,
cancellationToken: CancellationToken.None);
refreshedUri = addressInfo
.Get(Documents.Client.Protocol.Tcp)?
.ReplicaTransportAddressUris
.Single(x => x.ToString().Equals(addressTobeMarkedUnhealthy));
// Because the open connection attempt to the unhealthy replica was successful, the replica was
// marked as healthy.
mockHttpHandler.VerifyAll();
Assert.AreEqual(4, addressInfo.AllAddresses.Count);
Assert.IsNotNull(refreshedUri);
Assert.AreEqual(
expected: TransportAddressHealthState.HealthStatus.Connected,
actual: refreshedUri.GetCurrentHealthState().GetHealthStatus());
// This assertion makes sure that no additional calls were made to the open connection handler after
// since the last address refresh, because all the replicas are now either Unknown or Connected.
GatewayAddressCacheTests.AssertOpenConnectionHandlerAttributes(
fakeOpenConnectionHandler: fakeOpenConnectionHandler,
expectedExceptionCount: 1,
expectedMethodInvocationCount: 2,
expectedReceivedAddressesCount: 1,
expectedSuccessCount: 1);
}
/// <summary>
/// Test to validate that when replica validation is disabled and there exists some unhealthy replicas, the gateway address
/// cache doesn't validate the health state of the unhealthy replicas.
/// </summary>
[TestMethod]
[Owner("dkunda")]
public async Task TryGetAddressesAsync_WhenReplicaVlidationDisabled_ShouldNotValidateUnhealthyReplicas()
{
// Arrange.
Mock<IHttpHandler> mockHttpHandler = new(MockBehavior.Strict);
string oldAddress = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/4s";
string newAddress = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/5s";
string addressTobeMarkedUnhealthy = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/2s";
mockHttpHandler.SetupSequence(x => x.SendAsync(
It.IsAny<HttpRequestMessage>(),
It.IsAny<CancellationToken>()))
.Returns(MockCosmosUtil.CreateHttpResponseOfAddresses(new List<string>()
{
"rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/1p",
addressTobeMarkedUnhealthy,
"rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/3s",
oldAddress,
}))
.Returns(MockCosmosUtil.CreateHttpResponseOfAddresses(new List<string>()
{
"rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/1p",
addressTobeMarkedUnhealthy,
"rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/3s",
newAddress,
}));
FakeOpenConnectionHandler fakeOpenConnectionHandler = new(failingIndexes: new HashSet<int>());
HttpClient httpClient = new(new HttpHandlerHelper(mockHttpHandler.Object));
GatewayAddressCache cache = new(
new Uri(GatewayAddressCacheTests.DatabaseAccountApiEndpoint),
Documents.Client.Protocol.Tcp,
this.mockTokenProvider.Object,
this.mockServiceConfigReader.Object,
MockCosmosUtil.CreateCosmosHttpClient(() => httpClient),
openConnectionsHandler: fakeOpenConnectionHandler,
suboptimalPartitionForceRefreshIntervalInSeconds: 2,
enableTcpConnectionEndpointRediscovery: true);
DocumentServiceRequest request = DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid);
// Act and Assert.
PartitionAddressInformation addressInfo = await cache.TryGetAddressesAsync(
request: request,
partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity,
serviceIdentity: this.serviceIdentity,
forceRefreshPartitionAddresses: false,
cancellationToken: CancellationToken.None);
Assert.AreEqual(4, addressInfo.AllAddresses.Count);
Assert.AreEqual(1, addressInfo.AllAddresses.Count(x => x.PhysicalUri == oldAddress));
Assert.AreEqual(0, addressInfo.AllAddresses.Count(x => x.PhysicalUri == newAddress));
// Because force refresh is requested, an unhealthy replica is added to the failed endpoint so that it's status could be validted.
request.RequestContext.FailedEndpoints.Value.Add(
new TransportAddressUri(
addressUri: new Uri(
uriString: addressTobeMarkedUnhealthy)));
addressInfo = await cache.TryGetAddressesAsync(
request: request,
partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity,
serviceIdentity: this.serviceIdentity,
forceRefreshPartitionAddresses: true,
cancellationToken: CancellationToken.None);
Assert.AreEqual(4, addressInfo.AllAddresses.Count);
Assert.AreEqual(0, addressInfo.AllAddresses.Count(x => x.PhysicalUri == oldAddress));
Assert.AreEqual(1, addressInfo.AllAddresses.Count(x => x.PhysicalUri == newAddress));
TransportAddressUri refreshedUri = addressInfo
.Get(Documents.Client.Protocol.Tcp)?
.ReplicaTransportAddressUris
.Single(x => x.ToString().Equals(addressTobeMarkedUnhealthy));
Assert.IsNotNull(refreshedUri);
Assert.AreEqual(
expected: TransportAddressHealthState.HealthStatus.Unknown,
actual: refreshedUri.GetCurrentHealthState().GetHealthStatus());
mockHttpHandler.VerifyAll();
GatewayAddressCacheTests.AssertOpenConnectionHandlerAttributes(
fakeOpenConnectionHandler: fakeOpenConnectionHandler,
expectedExceptionCount: 0,
expectedMethodInvocationCount: 0,
expectedReceivedAddressesCount: 0,
expectedSuccessCount: 0);
}
/// <summary>
/// Blocks the current thread until a completion signal on the ManualResetEvent
/// is received. A timeout of 5 seconds is added to avoid any thread starvation.
/// </summary>
/// <param name="manualResetEvent">An instance of <see cref="ManualResetEvent"/>.</param>
/// <param name="shouldReset">A boolean flag indicating if a Reset on the manualResetEvent is required.</param>
private static void WaitForManualResetEventSignal(
ManualResetEvent manualResetEvent,
bool shouldReset)
{
manualResetEvent.WaitOne(
millisecondsTimeout: 5000);
if (shouldReset)
{
manualResetEvent.Reset();
}
}
/// <summary>
/// Helper method to assert on the <see cref="FakeOpenConnectionHandler"/> class attributes
/// to match with that of the expected ones.

Просмотреть файл

@ -392,5 +392,150 @@ namespace Microsoft.Azure.Cosmos.Tests.Routing
Assert.AreEqual(1, totalLazyCalls);
Assert.AreEqual("Test3", result);
}
/// <summary>
/// Test to validate that when RefreshAsync() is invoked for a valid existing key, the
/// cache refreshes the key successfully and the new value is updated in the cache.
/// </summary>
[TestMethod]
[Owner("dkunda")]
public async Task RefreshAsync_WhenRefreshRequestedForAnExistingKey_ShouldRefreshTheCache()
{
// Arrange.
AsyncCacheNonBlocking<string, string> asyncCache = new ();
// Act and Assert.
string result = await asyncCache.GetAsync(
"key",
(_) => Task.FromResult("value1"),
(_) => false);
Assert.AreEqual("value1", result);
await asyncCache.RefreshAsync(
"key",
(_) => Task.FromResult("value2"));
result = await asyncCache.GetAsync(
"key",
(_) => throw new Exception("Should not refresh."),
(_) => false);
Assert.AreEqual("value2", result);
}
/// <summary>
/// Test to validate that when a DocumentClientException is thrown during RefreshAsync() operation,
/// then the cache removes the key for which a refresh was requested.
/// </summary>
[TestMethod]
[Owner("dkunda")]
public async Task RefreshAsync_WhenThrowsDocumentClientException_ShouldRemoveKeyFromTheCache()
{
// Arrange.
AsyncCacheNonBlocking<string, string> asyncCache = new ();
// Act and Assert.
string result = await asyncCache.GetAsync(
"key",
(_) => Task.FromResult("value1"),
(_) => false);
Assert.AreEqual("value1", result);
result = await asyncCache.GetAsync(
"key",
(_) => Task.FromResult("value2"),
(_) => false);
// Because the key is already present in the cache and a force refresh was not requested
// the func delegate should not get invoked and thus the cache should not be updated
// and still return the old cached value.
Assert.AreEqual("value1", result);
NotFoundException notFoundException = new (
message: "Item was deleted.");
try
{
await asyncCache.RefreshAsync(
"key",
async (_) =>
{
await Task.Delay(TimeSpan.FromMilliseconds(5));
throw notFoundException;
});
Assert.Fail("Should throw a NotFoundException");
}
catch (NotFoundException exception)
{
Assert.AreEqual(notFoundException, exception);
}
// Because the key was deleted from the cache, the func delegate should get invoked at
// this point and update the value to value2.
result = await asyncCache.GetAsync(
"key",
(_) => Task.FromResult("value2"),
(_) => false);
Assert.AreEqual("value2", result);
}
/// <summary>
/// Test to validate that when some other Exception is thrown during RefreshAsync() operation,
/// then the cache does not remove the key for which the refresh was originally requested.
/// </summary>
[TestMethod]
[Owner("dkunda")]
public async Task RefreshAsync_WhenThrowsOtherException_ShouldNotRemoveKeyFromTheCache()
{
// Arrange.
AsyncCacheNonBlocking<string, string> asyncCache = new();
// Act and Assert.
string result = await asyncCache.GetAsync(
"key",
(_) => Task.FromResult("value1"),
(_) => false);
Assert.AreEqual("value1", result);
result = await asyncCache.GetAsync(
"key",
(_) => Task.FromResult("value2"),
(_) => false);
// Because the key is already present in the cache and a force refresh was not requested
// the func delegate should not get invoked and thus the cache should not be updated
// and still return the old cached value.
Assert.AreEqual("value1", result);
Exception exception = new(
message: "Timeout exception.");
try
{
await asyncCache.RefreshAsync(
"key",
async (_) =>
{
await Task.Delay(TimeSpan.FromMilliseconds(5));
throw exception;
});
Assert.Fail("Should throw a NotFoundException");
}
catch (Exception ex)
{
Assert.AreEqual(ex, exception);
}
// Because the key should not get deleted from the cache, the func delegate should not get invoked at
// this point.
result = await asyncCache.GetAsync(
"key",
(_) => Task.FromResult("value2"),
(_) => false);
Assert.AreEqual("value1", result);
}
}
}

Просмотреть файл

@ -0,0 +1,43 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Tests
{
using System;
using System.Reflection;
/// <summary>
/// Common utility class for reflaction related operations.
/// </summary>
internal static class ReflectionUtils
{
/// <summary>
/// This helper method uses reflection to set the private and read only fields
/// to the disered values to help the test cases mimic the expected behavior.
/// </summary>
/// <param name="objectName">An object where reflection will be applied to update the field.</param>
/// <param name="fieldName">A string containing the internal field name.</param>
/// <param name="delayInMinutes">An integer to add or substract the desired delay in minutes.</param>
internal static void AddMinuteToDateTimeFieldUsingReflection(
object objectName,
string fieldName,
int delayInMinutes)
{
FieldInfo fieldInfo = objectName
.GetType()
.GetField(
name: fieldName,
bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic);
DateTime? fieldValue = (DateTime?)fieldInfo
.GetValue(
obj: objectName);
fieldInfo
.SetValue(
obj: objectName,
value: ((DateTime)fieldValue).AddMinutes(delayInMinutes));
}
}
}

Просмотреть файл

@ -0,0 +1,338 @@
# Design Approach to Validate Backend Replicas During Service Upgrade in Direct Mode
## Table of Contents
* [Scope.](#scope)
* [Backgraound.](#backgraound)
* [Proposed Solution.](#proposed-solution)
* [Design Approach.](#design-approach)
* [Outline.](#outline)
* [Updated Sequence Diagram for `CosmosClient` initialization.](#updated-sequence-diagram-for-cosmosclient-initialization)
* [Sequence Diagram when `StoreReader` invokes the `GatewayAddressCache` to resolve addresses and leverages `AddressEnumerator` to enumerate the transport addresses.](#sequence-diagram-when-storereader-invokes-the-gatewayaddresscache-to-resolve-addresses-and-leverages-addressenumerator-to-enumerate-the-transport-addresses)
* [State Diagram to Understand the `TransportAddressUri` Health State Transformations.](#state-diagram-to-understand-the-transportaddressuri-health-state-transformations)
* [`Microsoft.Azure.Cosmos.Direct` package class diagrams.](#azurecosmosdirect-package-class-diagrams)
* [`Microsoft.Azure.Cosmos` package class diagrams.](#microsoftazurecosmos-package-class-diagrams)
* [Pull Request with Sample Code Changes.](#pull-request-with-sample-code-changes)
* [References.](#references)
## Scope
The scope of the replica validation workstream is targed for the `CosmosClient` configured for `Direct` mode.
## Backgraund
During an upgrade scenario in the backend replica nodes, there has been an observation of increased request latency. One of the primary reason for the latency is that, during an upgrade, a replica which is still undergoing upgrade may still be returned back to SDK, when an address refresh occurres. As of today, the incoming request will have `25%` chance to hit the replica that not ready yet, therefore causing the `ConnectionTimeoutException`, which contributes to the increased latency.
To understand the problem statement better, please take a look at the below sequence diagram which reflects the connection timeouts caused by the replica upgrade.
```mermaid
sequenceDiagram
autonumber
participant A as StoreReader <br> [Direct Code]
participant B as GlobalAddressResolver <br> [v3 Code]
participant C as GatewayAddressCache <br> [v3 Code]
participant D as GatewayService <br> [External Service]
participant E as BackendReplica <br> [A Replica Node Still Undergoing Upgrade <br> Address: rntbd://test.azure.com:443/partitions/2s]
A->>+B: Request (forceRefresh - false)
B->>+C: TryGetAddresses (forceRefresh - false)
C->>-B: Fetch Cached Addresses <br> rntbd://test.azure.com:443/partitions/2s
B->>-A: Return Addresses <br> rntbd://test.azure.com:443/partitions/2s
A->>+E: Request Sent to Backend Replica
E-x-A: Request fails with 410 GoneException
A->>+B: Request (forceRefresh - true) <br> GoneWithRetryAttempt
B->>+C: TryGetAddresses (forceRefresh - true)
C->>+D: GetServerAddresses
D->>-C: Returns the new refreshed addresses <br> rntbd://test.azure.com:443/partitions/2s
Note over D: Note that the returned addresses from <br> GatewayService may still undergoing <br> the upgrade, thus and they are not in a ready state.
C->>-B: Returns the refreshed addresses <br> rntbd://test.azure.com:443/partitions/2s
B->>-A: Returns the refreshed addresses <br> rntbd://test.azure.com:443/partitions/2s
A->>+E: Request Sent to Backend Replica
E-x-A: Request fails again with 410 GoneException
Note over A: Note that the request fails to connect to the replica <br> which causes a "ConnectionTimeoutException".
```
## Proposed Solution
The .NET SDK will track the replica endpoint health based on client side metrics, and de-prioritize any replica which were marked as - `Unhealthy`. SDK will validate the health of the replica by attempting to open RNTBD connections to the backend. When SDK refresh addresses back from gateway for a partition, **SDK will only validate the replica/s which were in `Unhealthy` status, by opening RNTBD connection requests**. This process will be completed with best effort, which means:
- If the validation can not finish within `1 min` of opening connections, the de-prioritize will stop for certain status.
- The selection of the replica will not be blocked by the validation process. To better understand this - if a request needs to be sent to `N` replicas, and if there is only `N-1` replica in good status, it will still go ahead selecting the `Nth` replica which needs validation.
- It is opt in only for now, by setting the environment variable `AZURE_COSMOS_REPLICA_VALIDATION_ENABLED` to `true`.
## Design Approach
### Outline
The basic idea for this design approach has been divited into *three* parts, which has been mentioned below in detail:
- Maintain `4` new health statuses into the `TransportAddressUri`, which are :
- **`Connected`** (Indicates that there is already a connection made successfully to the backend replica)
- **`Unknown`** (Indicates that the connection exists however the status is unknown at the moment)
- **`Unhealthy Pending`** (Indicates that the connection was unhealthy previously, but an attempt will be made to validate the replica to check the current status)
- **`Unhealthy`** (Indicates that the connection is unhealthy at the moment)
- Validate the `Unhealthy` replicas returned from the Address Cache, by attempting to open the RNTBD connection. Note that the validation task and the connection opening part has been done as a background task, so that the address resolving doesn't wait on the RNTBD context negotiation to finish.
- Leverage the `AddressEnumerator` to reorder the replicas by their health statuses. For instance, if the replica validation is enabled, the `AddressEnumerator` will reorder the replicas by sorting them in the order of **Connected/ Unknown** > **Unhealthy Pending** > **Unhealthy**. However, if the replica validation is disabled, the replicas will be sorted in the order of **Connected/ Unknown/ Unhealthy Pending** > **Unhealthy**.
Prior discussing the replica validation, it is very important to understand the changes in the flow while opening the RNTBD connections to the backend replica nodes, during the `CosmosClient` initialization. The changes in the flow are mentioned below as an updated sequence diagram.
### Updated Sequence Diagram for `CosmosClient` initialization.
```mermaid
sequenceDiagram
participant A as CosmosClient <br> [v3 Code]
participant B as ClientContextCore <br> [v3 Code]
participant C as DocumentClient <br> [v3 Code]
participant D as ServerStoreModel <br> [Direct Code]
participant K as StoreClientFactory <br> [Direct Code]
participant E as StoreClient <br> [Direct Code]
participant F as ReplicatedResourceClient <br> [Direct Code]
participant G as GlobalAddressResolver <br> [v3 Code]
participant H as GatewayAddressCache <br> [v3 Code]
participant J as RntbdOpenConnectionHandler <br> [Direct Code]
participant I as TransportClient <br> [Direct Code]
A->>B: 1. InitializeContainerWithRntbdAsync()
B->>C: 2. OpenConnectionsAsync()
C->>C: 2.1. CreateStoreModel()
C->>K: 3. CreateStoreClient(addressResolver)
K->>E: 4. new StoreClient(addressResolver)
E->>G: 5. SetOpenConnectionsHandler(new RntbdOpenConnectionHandler(transportClient))
Note over E: a) Creates a new instance of RntbdOpenConnectionHandler <br> and sets it to the IAddressResolverExtension. <br> Note that the GlobalAddressResolver implements the <br> IAddressResolverExtension today. <br> b) Sets the IAddressResolverExtension to the Replicated <br> ResourceClient.
G->>H: 6. SetOpenConnectionsHandler(openConnectionHandler)
C->>D: 7. OpenConnectionsToAllReplicasAsync()
D->>E: 8. OpenConnectionsToAllReplicasAsync()
E->>F: 9. OpenConnectionsToAllReplicasAsync()
F->>G: 10. OpenConnectionsToAllReplicasAsync()
G->>G: 10.1 collection = collectionCache.<br>ResolveByNameAsync()
G->>G: 10.2 partitionKeyRanges = routingMapProvider.<br>TryGetOverlappingRangesAsync(FullRange)
G->>H: 11. OpenAsync<br>(partitionKeyRangeIdentities)
Note over G: Resolves the collection by the <br> container link url and fetches <br> the partition key full ranges.
H->>H: 11.1 GetServerAddressesViaGatewayAsync()
H->>J: 12. OpenRntbdChannelsAsync()
Note over H: Gets the transport address uris from address info <br> and invokes the RntbdOpenConnectionHandler <br> OpenRntbdChannelsAsync() method with the transport uris <br> to establish the Rntbd connection.
J->>I: 13. OpenConnectionAsync() <br> using the resolved transport address uris.
```
Now that we are well aware of the changes in the RNTBD open connection flow, let's leverage the changes in the replica validation flow. The below sequence diagram describes the request and response flow from `StoreReader` (present in the `Cosmos.Direct` namespace) to the `GatewayAddressCache` (present in the `Microsoft.Azure.Cosmos` namespace), during the read request to backend replica/s.
### Sequence Diagram when `StoreReader` invokes the `GatewayAddressCache` to resolve addresses and leverages `AddressEnumerator` to enumerate the transport addresses.
```mermaid
sequenceDiagram
participant A as StoreReader <br> [Direct Code]
participant B as AddressSelector <br> [v3 Code]
participant C as GlobalAddressResolver <br> [v3 Code]
participant D as AddressResolver <br> [v3 Code]
participant E as GatewayAddressCache <br> [v3 Code]
participant F as RntbdOpenConnectionHandler <br> [Direct Code]
participant G as AsyncCacheNonBlocking <br> [v3 Code]
participant H as GatewayService <br> [external service]
participant I as AddressEnumerator <br> [Direct Code]
participant J as TransportClient <br> [Direct Code]
participant K as Channel <br> [Direct Code]
participant L as TransportAddressUri <br> [Direct Code]
A->>A: 1. ReadMultipleReplicaAsync()
A->>B: 2. ResolveAllTransportAddressUriAsync(forceRefresh - false)
B->>C: 3. ResolveAsync(forceRefresh - false)
C->>D: 4. ResolveAsync(forceRefresh - false)
D->>D: 5. ResolveAddressesAndIdentityAsync()
D->>E: 6. TryGetAddressesAsync(forceRefresh - false)
E->>G: 7. GetAsync ("singleValueInitFunc delegate", "forceRefresh - false")
Note over L: Initial health status of a <br> TransportAddressUri is "Unknown".
Note over E: Passes the SingleValueInitFunc delegate <br> to async nonblocking cache.
G->>E: 8. Returns the cached addresses
E->>D: 9. Returns the resolved addresses
D->>C: 10. Returns the resolved addresses
C->>B: 11. Returns the resolved addresses
B->>A: 12. Returns the resolved addresses
A->>A: 13. Request failes with "GoneException"
Note over A: Sets Force Refresh <br> header to true.
A->>A: 14. ReadMultipleReplicaAsync()
A->>B: 15. ResolveAllTransportAddressUriAsync (forceRefresh - true)
B->>C: 16. ResolveAsync(forceRefresh - true)
C->>D: 17. ResolveAsync(forceRefresh - true)
D->>D: 18. ResolveAddressesAndIdentityAsync(forceRefresh - true)
D->>E: 19. TryGetAddressesAsync(forceRefresh - true)
E->>G: 20. GetAsync ("singleValueInitFunc delegate", "forceRefresh - true")
Note over E: Passes the SingleValueInitFunc delegate <br> to async nonblocking cache.
G->>E: 21. Invokes the singleValueInitFunc delegate
E->>E: 22. SetTransportAddressUrisToUnhealthy(currentCachedValue, failedEndpoints)
Note over E: Sets the failed TransportAddressUris <br> to an "Unhealthy" status.
E->>L: 23. SetUnhealthy()
E->>E: 24. GetAddressesForRangeIdAsync(forceRefresh - true, cachedAddresses)
E->>H: 25. Invokes the GatewayService using GetServerAddressesViaGatewayAsync() <br> to receive new addresses.
H->>E: 26. Receives the resolved new addresses.
E->>E: 27. MergeAddresses<NewAddress, CachedAddress>
Note over E: The purpose of the merge is to restore the health statuses of all the new addresses to <br> that of their recpective cached addresses, if returned same addresses.
E->>L: 28. SetRefreshedIfUnhealthy()
Note over E: Sets any TransportAddressUri with <br> an "Unhealthy" status to "UnhealthyPending".
E->>E: 29. ValidateReplicaAddresses(mergedTransportAddress)
Note over E: Validates the backend replicas, <br> if the replica validation env variable is enabled.
E-->>F: 30. OpenRntbdChannelsAsync(mergedTransportAddress)
Note over E: If replica validation is enabled, then validate unhealthy pending <br> replicas by opening RNTBD connections. Note that this <br> operations executes as a background task.
F-->>J: 31. OpenConnectionAsync() <br> using the resolved transport address uris
J-->>K: 32. OpenConnectionAsync() <br> using the resolved physical address uris
K-->>L: 33. SetConnected()
Note over K: Initializes and Establishes a RNTBD <br> context negotiation to the backend replica nodes.
E->>G: 34. Returns the merged addresses to cache and store into the Async Nonblocking Cache
G->>E: 35. Returns the resolved addresses
E->>E: 36. ShouldRefreshHealthStatus()
Note over E: Refresh the cache if there was an address <br> has been marked as unhealthy long enough (more than a minute) <br> and need to revalidate its status.
E-->>G: 37. Refresh ("GetAddressesForRangeIdAsync() as the singleValueInitFunc delegate", "forceRefresh - true")
Note over E: Note that the refresh operation <br> happens as a background task.
E->>D: 38. Returns the resolved addresses
D->>C: 39. Returns the resolved addresses
C->>B: 40. Returns the resolved addresses
B->>A: 41. Returns the resolved transport addresses
A->>I: 42. GetTransportAddresses("transportAddressUris", "replicaAddressValidationEnabled")
I->>I: 43. ReorderReplicasByHealthStatus()
Note over I: Re-orders the transport addresses <br> by their health statuses <br> Connected/Unknown >> UnhealthyPending >> Unhealthy.
I->>A: 44. Returns the transport addresses re-ordered by their health statuses
```
### State Diagram to Understand the `TransportAddressUri` Health State Transformations.
To better understand the design, it is critical to understand the `TransportAddressUri` health state transformations. The below state diagram depicts the `TransportAddressUri` state transitions in detail.
```mermaid
stateDiagram-v2
[*] --> Unknown
note right of Unknown
Initial state of <br> a TransportAddressUri
end note
Unknown --> Connected: Channel Aquire <br> Successful
Unknown --> Unhealthy: Channel Aquire <br> Failed
Unhealthy --> Connected: Channel Aquire <br> Successful after 1 Min
Unhealthy --> UnhealthyPending: Refresh Addresses <br> from Gateway when <br> Replica Validation <br> is enabled
UnhealthyPending --> Unhealthy: RntbdOpenConnectionHandler - <br> Channel Aquire <br> Failed
UnhealthyPending --> Connected: RntbdOpenConnectionHandler - <br> Channel Aquire <br> Successful
Connected --> Unhealthy: Request failed with 410 <br> GoneException and <br> force refresh
Connected --> [*]
```
To accomplish the above changes in the replica validation flow, below are the class diagrams and the proposed code changes in both `Microsoft.Azure.Cosmos.Direct` and `Microsoft.Azure.Cosmos` packages.
### `Microsoft.Azure.Cosmos.Direct` package class diagrams.
Introduce a new `IOpenConnectionsHandler` interface with `OpenRntbdChannelsAsync()` method. Create a new class `RntbdOpenConnectionHandler` that will eventually implement the `IOpenConnectionsHandler` interface and override the `OpenRntbdChannelsAsync()` method to establish Rntbd connections to the transport address uris. Note that, this class will also add the concurrency control logic, so that any burst of connection creation could be avoided. The below class diagram depicts the same behavior.
```mermaid
classDiagram
IOpenConnectionsHandler --|> RntbdOpenConnectionHandler : implements
<<Interface>> IOpenConnectionsHandler
IOpenConnectionsHandler: +OpenRntbdChannelsAsync(IReadOnlyList~TransportAddressUri~ addresses)
class RntbdOpenConnectionHandler{
-TransportClient transportClient
-SemaphoreSlim semaphore
-int SemaphoreAcquireTimeoutInMillis
+OpenRntbdChannelsAsync()
}
```
Extend the `IAddressResolverExtension` interface with `SetOpenConnectionsHandler()` method. The benefits and the utilizations are provided below:
- The `GlobalAddressResolver` can then implement the `SetOpenConnectionsHandler()` method, which will be invoked by the `StoreClient` constructor to set the `RntbdOpenConnectionHandler`.
- The `OpenConnectionsAsync()` method present inside `GlobalAddressResolver`, will be invoked from the `ReplicatedResourceClient` eventually. The `AddressResolver` class can simply implement the method/s and return an empty `Task`. The `GlobalAddressResolver`.`OpenConnectionsAsync()` is responsible for
- Resolving the collection by the database name and container link,
- Fetching the partition key full ranges and
- Invoking the `GatewayAddressCache` for the preferred region, with the `RntbdOpenConnectionHandler` instance, passed by the `StoreClient`.
The below class diagram depicts the same behavior.
*[Note: The `IAddressResolverExtension` was introduced to hold the new methods, which could break the existing build, if put directly into `IAddressResolver` interface. The `IAddressResolverExtension` will be removed eventually and the existing methods will be moved into `IAddressResolver` interface.]*
```mermaid
classDiagram
IAddressResolver --|> IAddressResolverExtension : extends
IAddressResolverExtension --|> GlobalAddressResolver : implements
<<Interface>> IAddressResolver
<<Interface>> IAddressResolverExtension
IAddressResolverExtension: +OpenConnectionsAsync(string databaseName, string containerLinkUri)
IAddressResolverExtension: +SetOpenConnectionsHandler(IOpenConnectionsHandler openConnectionHandler)
class GlobalAddressResolver{
+OpenConnectionsAsync()
+SetOpenConnectionsHandler()
}
```
Update the method definition of `GetTransportAddresses()` present in `IAddressEnumerator` to add a new `boolean` argument `replicaAddressValidationEnabled`. This will help to choose the correct set of replica/s when replica validation is enabled or disabled. Additionally, a new private method `ReorderReplicasByHealthStatus()` will be added in `AddressEnumerator` to re-order the transport uri/s by their health status priority. The below class diagram depicts the same changes.
```mermaid
classDiagram
IAddressEnumerator --|> AddressEnumerator : implements
<<Interface>> IAddressEnumerator
IAddressEnumerator: +GetTransportAddresses(IReadOnlyList~TransportAddressUri~ transportAddressUris, Lazy~HashSet~TransportAddressUri~~ failedEndpoints, bool replicaAddressValidationEnabled) IEnumerable~TransportAddressUri~
class AddressEnumerator{
+GetTransportAddresses()
-ReorderReplicasByHealthStatus(IReadOnlyList~TransportAddressUri~ transportAddressUris, Lazy~HashSet~TransportAddressUri~~ failedEndpoints, bool replicaAddressValidationEnabled) IEnumerable~TransportAddressUri~
}
```
This class will be updated eventually, with critical set of changes required for the replica validation workstream. It will introduce an enum `HealthStatus` with `4` new health statuses - `Connected`, `Unknown`, `UnhealthyPending` and `Unhealthy` to re-order the replicas with their status priorities. The setters will help to capture the correct health state of the `TransportAddressUri` at any given point of time. The below class diagram depicts the same behavior.
```mermaid
classDiagram
class TransportAddressUri {
-DateTime? lastUnknownTimestamp
-DateTime? lastUnhealthyPendingTimestamp
-DateTime? lastUnhealthyTimestamp
-HealthStatus healthStatus
-ReaderWriterLockSlim healthStatusLock
+SetConnected()
+SetRefreshedIfUnhealthy()
+SetHealthStatus(HealthStatus status, bool forceSet)
+GetHealthStatus() HealthStatus
+GetEffectiveHealthStatus() HealthStatus
+ShouldRefreshHealthStatus() bool
}
class HealthStatus {
<<enumeration>>
Connected : 100
Unknown : 200
UnhealthyPending : 300
Unhealthy : 400
}
```
### `Microsoft.Azure.Cosmos` package class diagrams.
Extend the `IAddressCache` interface with `SetOpenConnectionsHandler()` method. This method should take an instance of the `RntbdOpenConnectionHandler` (which implements the `IOpenConnectionsHandler`) and sets it to a private field for later usage, to establish the Rntbd connectivity to the backend replica nodes.
```mermaid
classDiagram
IAddressCache --|> GatewayAddressCache : implements
<<Interface>> IAddressCache
IAddressCache: +SetOpenConnectionsHandler(IOpenConnectionsHandler openConnectionsHandler)
class GatewayAddressCache {
-IOpenConnectionsHandler openConnectionsHandler
-string replicaValidationVariableName
+SetOpenConnectionsHandler(IOpenConnectionsHandler openConnectionsHandler)
-MergeAddresses(PartitionAddressInformation newAddresses, PartitionAddressInformation cachedAddresses) PartitionAddressInformation
-ValidateReplicaAddresses(IReadOnlyList~TransportAddressUri~ addresses)
}
```
Add a new method `Refresh()` into the `AsyncCacheNonBlocking` to force refresh the address cache on demand. Note that `Refresh()` is used to force refresh any `Unhealthy` replica nodes, which has been in `Unhealthy` state for more than `1` minute. That way, any `Unhealthy` replica nodes will be back into the validation pool, which will eventually be useful to avoid any skewed replica selection.
```mermaid
classDiagram
class AsyncCacheNonBlocking~TKey, TValue~ {
+Task Refresh(TKey key, Func~TValue, Task~TValue~~ singleValueInitFunc)
}
```
## Pull Request with Sample Code Changes
Here is the [link to a sample PR](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/3570) which provides an overview of the incoming changes.
## References
- [Mermaid Documentation.](https://mermaid-js.github.io/mermaid/#/)
- [Upgrade Resiliency Tasks List.](https://github.com/Azure/azure-cosmos-dotnet-v3/issues/3409)
- [Design Document to Utilize RNTBD Context Negotiation During `CosmosClient` Initialization.](https://github.com/Azure/azure-cosmos-dotnet-v3/issues/3442)