diff --git a/Microsoft.Azure.Cosmos/src/Routing/AsyncCacheNonBlocking.cs b/Microsoft.Azure.Cosmos/src/Routing/AsyncCacheNonBlocking.cs index 2b1f82ea3..9fb908d83 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/AsyncCacheNonBlocking.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/AsyncCacheNonBlocking.cs @@ -120,30 +120,11 @@ namespace Microsoft.Azure.Cosmos throw; } - try - { - return await initialLazyValue.CreateAndWaitForBackgroundRefreshTaskAsync( - createRefreshTask: singleValueInitFunc); - } - catch (Exception e) - { - if (initialLazyValue.ShouldRemoveFromCacheThreadSafe()) - { - DefaultTrace.TraceError( - "AsyncCacheNonBlocking.GetAsync with ForceRefresh Failed. key: {0}, Exception: {1}", - key, - e); - - // In some scenarios when a background failure occurs like a 404 - // the initial cache value should be removed. - if (this.removeFromCacheOnBackgroundRefreshException(e)) - { - this.TryRemove(key); - } - } - - throw; - } + return await this.UpdateCacheAndGetValueFromBackgroundTaskAsync( + key: key, + initialValue: initialLazyValue, + callbackDelegate: singleValueInitFunc, + operationName: nameof(GetAsync)); } // The AsyncLazyWithRefreshTask is lazy and won't create the task until GetValue is called. @@ -196,6 +177,70 @@ namespace Microsoft.Azure.Cosmos return this.values.TryRemove(key, out _); } + /// + /// Refreshes the async non blocking cache on-demand for the given + /// and caches the result for later usage. + /// + /// The requested key to be refreshed. + /// A func delegate to be invoked at a later point of time. + public async Task RefreshAsync( + TKey key, + Func> singleValueInitFunc) + { + if (this.values.TryGetValue(key, out AsyncLazyWithRefreshTask initialLazyValue)) + { + await this.UpdateCacheAndGetValueFromBackgroundTaskAsync( + key: key, + initialValue: initialLazyValue, + callbackDelegate: singleValueInitFunc, + operationName: nameof(RefreshAsync)); + } + } + + /// + /// Creates a background task to invoke the callback delegate and updates the cache with the value returned from the delegate. + /// + /// The requested key to be updated. + /// An instance of containing the initial cached value. + /// A func callback delegate to be invoked at a later point of time. + /// A string indicating the operation on the cache. + /// A containing the updated, refreshed value. + private async Task UpdateCacheAndGetValueFromBackgroundTaskAsync( + TKey key, + AsyncLazyWithRefreshTask initialValue, + Func> callbackDelegate, + string operationName) + { + try + { + return await initialValue.CreateAndWaitForBackgroundRefreshTaskAsync( + createRefreshTask: callbackDelegate); + } + catch (Exception ex) + { + if (initialValue.ShouldRemoveFromCacheThreadSafe()) + { + bool removed = false; + + // In some scenarios when a background failure occurs like a 404 + // the initial cache value should be removed. + if (this.removeFromCacheOnBackgroundRefreshException(ex)) + { + removed = this.TryRemove(key); + } + + DefaultTrace.TraceError( + "AsyncCacheNonBlocking Failed. key: {0}, operation: {1}, tryRemoved: {2}, Exception: {3}", + key, + operationName, + removed, + ex); + } + + throw; + } + } + /// /// This is AsyncLazy that has an additional Task that can /// be used to update the value. This allows concurrent requests diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index 7c6355591..b57744376 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -46,6 +46,7 @@ namespace Microsoft.Azure.Cosmos.Routing private readonly bool enableTcpConnectionEndpointRediscovery; private readonly CosmosHttpClient httpClient; + private readonly bool isReplicaAddressValidationEnabled; private Tuple masterPartitionAddressCache; private DateTime suboptimalMasterPartitionTimestamp; @@ -84,10 +85,27 @@ namespace Microsoft.Azure.Cosmos.Routing GatewayAddressCache.ProtocolString(this.protocol)); this.openConnectionsHandler = openConnectionsHandler; + this.isReplicaAddressValidationEnabled = Helpers.GetEnvironmentVariableAsBool( + name: Constants.EnvironmentVariables.ReplicaConnectivityValidationEnabled, + defaultValue: false); } public Uri ServiceEndpoint => this.serviceEndpoint; + /// + /// Gets the address information from the gateway and sets them into the async non blocking cache for later lookup. + /// Additionally attempts to establish Rntbd connections to the backend replicas based on `shouldOpenRntbdChannels` + /// boolean flag. + /// + /// A string containing the database name. + /// An instance of containing the collection properties. + /// A read only list containing the partition key range identities. + /// A boolean flag indicating whether Rntbd connections are required to be established + /// to the backend replica nodes. For cosmos client initialization and cache warmups, the Rntbd connection are needed to be + /// openned deterministically to the backend replicas to reduce latency, thus the + /// should be set to `true` during cosmos client initialization and cache warmups. The OpenAsync flow from DocumentClient + /// doesn't require the connections to be opened deterministically thus should set the parameter to `false`. + /// An instance of . public async Task OpenConnectionsAsync( string databaseName, ContainerProperties collection, @@ -161,6 +179,10 @@ namespace Microsoft.Azure.Cosmos.Routing new PartitionKeyRangeIdentity(collection.ResourceId, addressInfo.Item1.PartitionKeyRangeId), addressInfo.Item2); + // The `shouldOpenRntbdChannels` boolean flag indicates whether the SDK should establish Rntbd connections to the + // backend replica nodes. For the `CosmosClient.CreateAndInitializeAsync()` flow, the flag should be passed as + // `true` so that the Rntbd connections to the backend replicas could be established deterministically. For any + // other flow, the flag should be passed as `false`. if (this.openConnectionsHandler != null && shouldOpenRntbdChannels) { await this.openConnectionsHandler @@ -178,6 +200,7 @@ namespace Microsoft.Azure.Cosmos.Routing this.openConnectionsHandler = openConnectionsHandler; } + /// public async Task TryGetAddressesAsync( DocumentServiceRequest request, PartitionKeyRangeIdentity partitionKeyRangeIdentity, @@ -229,6 +252,7 @@ namespace Microsoft.Azure.Cosmos.Routing return this.GetAddressesForRangeIdAsync( request, + cachedAddresses: currentCachedValue, partitionKeyRangeIdentity.CollectionRid, partitionKeyRangeIdentity.PartitionKeyRangeId, forceRefresh: forceRefreshPartitionAddresses); @@ -259,6 +283,7 @@ namespace Microsoft.Azure.Cosmos.Routing key: partitionKeyRangeIdentity, singleValueInitFunc: (_) => this.GetAddressesForRangeIdAsync( request, + cachedAddresses: null, partitionKeyRangeIdentity.CollectionRid, partitionKeyRangeIdentity.PartitionKeyRangeId, forceRefresh: false), @@ -278,6 +303,27 @@ namespace Microsoft.Azure.Cosmos.Routing this.suboptimalServerPartitionTimestamps.TryAdd(partitionKeyRangeIdentity, DateTime.UtcNow); } + // Refresh the cache on-demand, if there were some address that remained as unhealthy long enough (more than 1 minute) + // and need to revalidate its status. The reason it is not dependent on 410 to force refresh the addresses, is being: + // When an address is marked as unhealthy, then the address enumerator will deprioritize it and move it back to the + // end of the transport uris list. Therefore, it could happen that no request will land on the unhealthy address for + // an extended period of time therefore, the chances of 410 (Gone Exception) to trigger the forceRefresh workflow may + // not happen for that particular replica. + if (addresses + .Get(Protocol.Tcp) + .ReplicaTransportAddressUris + .Any(x => x.ShouldRefreshHealthStatus())) + { + Task refreshAddressesInBackgroundTask = Task.Run(async () => await this.serverPartitionAddressCache.RefreshAsync( + key: partitionKeyRangeIdentity, + singleValueInitFunc: (currentCachedValue) => this.GetAddressesForRangeIdAsync( + request, + cachedAddresses: currentCachedValue, + partitionKeyRangeIdentity.CollectionRid, + partitionKeyRangeIdentity.PartitionKeyRangeId, + forceRefresh: true))); + } + return addresses; } catch (DocumentClientException ex) @@ -384,6 +430,7 @@ namespace Microsoft.Azure.Cosmos.Routing key: partitionKeyRangeIdentity, singleValueInitFunc: (_) => this.GetAddressesForRangeIdAsync( null, + cachedAddresses: null, partitionKeyRangeIdentity.CollectionRid, partitionKeyRangeIdentity.PartitionKeyRangeId, forceRefresh: true), @@ -444,6 +491,7 @@ namespace Microsoft.Azure.Cosmos.Routing private async Task GetAddressesForRangeIdAsync( DocumentServiceRequest request, + PartitionAddressInformation cachedAddresses, string collectionRid, string partitionKeyRangeId, bool forceRefresh) @@ -475,6 +523,32 @@ namespace Microsoft.Azure.Cosmos.Routing throw new PartitionKeyRangeGoneException(errorMessage) { ResourceAddress = collectionRid }; } + if (this.isReplicaAddressValidationEnabled) + { + // The purpose of this step is to merge the new transport addresses with the old one. What this means is - + // 1. If a newly returned address from gateway is already a part of the cache, then restore the health state + // of the new address with that of the cached one. + // 2. If a newly returned address from gateway doesn't exist in the cache, then keep using the new address + // with `Unknown` (initial) status. + PartitionAddressInformation mergedAddresses = GatewayAddressCache.MergeAddresses(result.Item2, cachedAddresses); + IReadOnlyList transportAddressUris = mergedAddresses.Get(Protocol.Tcp)?.ReplicaTransportAddressUris; + + // If cachedAddresses are null, that would mean that the returned address from gateway would remain in Unknown + // status and there is no cached state that could transition them into Unhealthy. + if (cachedAddresses != null) + { + foreach (TransportAddressUri address in transportAddressUris) + { + // The main purpose for this step is to move address health status from Unhealthy to UnhealthyPending. + address.SetRefreshedIfUnhealthy(); + } + } + + this.ValidateUnhealthyPendingReplicas(transportAddressUris); + + return mergedAddresses; + } + return result.Item2; } } @@ -760,6 +834,86 @@ namespace Microsoft.Azure.Cosmos.Routing } } + /// + /// Validates the unhealthy pending replicas by attempting to open the Rntbd connection. This operation + /// will eventually marks the unhealthy pending replicas to healthy, if the rntbd connection attempt made was + /// successful or unhealthy otherwise. + /// + /// A read-only list of needs to be validated. + private void ValidateUnhealthyPendingReplicas( + IReadOnlyList addresses) + { + if (addresses == null) + { + throw new ArgumentNullException(nameof(addresses)); + } + + IEnumerable addressesNeedToValidation = addresses + .Where(address => address + .GetCurrentHealthState() + .GetHealthStatus() == TransportAddressHealthState.HealthStatus.UnhealthyPending); + + if (addressesNeedToValidation.Any()) + { + Task openConnectionsInBackgroundTask = Task.Run(async () => await this.openConnectionsHandler.TryOpenRntbdChannelsAsync( + addresses: addressesNeedToValidation.ToList())); + } + } + + /// + /// Merge the new addresses returned from gateway service with that of the cached addresses. If the returned + /// new addresses list contains some of the addresses, which are already cached, then reset the health state + /// of the new address to that of the cached one. If the the new addresses doesn't contain any of the cached + /// addresses, then keep using the health state of the new addresses, which should be `unknown`. + /// + /// A list of containing the latest + /// addresses being returned from gateway. + /// A list of containing the cached + /// addresses from the async non blocking cache. + /// A list of containing the merged addresses. + private static PartitionAddressInformation MergeAddresses( + PartitionAddressInformation newAddresses, + PartitionAddressInformation cachedAddresses) + { + if (newAddresses == null) + { + throw new ArgumentNullException(nameof(newAddresses)); + } + + if (cachedAddresses == null) + { + return newAddresses; + } + + PerProtocolPartitionAddressInformation currentAddressInfo = newAddresses.Get(Protocol.Tcp); + PerProtocolPartitionAddressInformation cachedAddressInfo = cachedAddresses.Get(Protocol.Tcp); + Dictionary cachedAddressDict = new (); + + foreach (TransportAddressUri transportAddressUri in cachedAddressInfo.ReplicaTransportAddressUris) + { + cachedAddressDict[transportAddressUri.ToString()] = transportAddressUri; + } + + foreach (TransportAddressUri transportAddressUri in currentAddressInfo.ReplicaTransportAddressUris) + { + if (cachedAddressDict.ContainsKey(transportAddressUri.ToString())) + { + TransportAddressUri cachedTransportAddressUri = cachedAddressDict[transportAddressUri.ToString()]; + transportAddressUri.ResetHealthStatus( + status: cachedTransportAddressUri.GetCurrentHealthState().GetHealthStatus(), + lastUnknownTimestamp: cachedTransportAddressUri.GetCurrentHealthState().GetLastKnownTimestampByHealthStatus( + healthStatus: TransportAddressHealthState.HealthStatus.Unknown), + lastUnhealthyPendingTimestamp: cachedTransportAddressUri.GetCurrentHealthState().GetLastKnownTimestampByHealthStatus( + healthStatus: TransportAddressHealthState.HealthStatus.UnhealthyPending), + lastUnhealthyTimestamp: cachedTransportAddressUri.GetCurrentHealthState().GetLastKnownTimestampByHealthStatus( + healthStatus: TransportAddressHealthState.HealthStatus.Unhealthy)); + + } + } + + return newAddresses; + } + protected virtual void Dispose(bool disposing) { if (this.disposedValue) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs index 3954b4ae8..9e3b87ec8 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs @@ -6,16 +6,15 @@ namespace Microsoft.Azure.Cosmos using System; using System.Collections.Generic; using System.Collections.ObjectModel; - using System.Collections.Specialized; using System.Linq; using System.Net; using System.Net.Http; + using System.Reflection; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Common; using Microsoft.Azure.Cosmos.Routing; - using Microsoft.Azure.Cosmos.Serialization.HybridRow; using Microsoft.Azure.Cosmos.Tests; using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Documents; @@ -872,6 +871,436 @@ namespace Microsoft.Azure.Cosmos expectedSuccessCount: addresses.Count); } + /// + /// Test to validate that when replica validation is enabled and force address refresh happens to fetch the latest address from gateway, + /// if in case the gateway returns the same address which was previously unhealthy, the gateway address cache resets the returned status + /// to unhealthy and validates that replica using the open connection handler and finally marks it to connected. + /// + [TestMethod] + [Owner("dkunda")] + public async Task TryGetAddressesAsync_WhenReplicaVlidationEnabled_ShouldValidateUnhealthyReplicasHealth() + { + // Arrange. + ManualResetEvent manualResetEvent = new(initialState: false); + Mock mockHttpHandler = new (MockBehavior.Strict); + string oldAddress = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/4s"; + string newAddress = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/5s"; + string addressTobeMarkedUnhealthy = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/2s"; + mockHttpHandler.SetupSequence(x => x.SendAsync( + It.IsAny(), + It.IsAny())) + .Returns(MockCosmosUtil.CreateHttpResponseOfAddresses(new List() + { + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/1p", + addressTobeMarkedUnhealthy, + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/3s", + oldAddress, + })) + .Returns(MockCosmosUtil.CreateHttpResponseOfAddresses(new List() + { + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/1p", + addressTobeMarkedUnhealthy, + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/3s", + newAddress, + })); + + FakeOpenConnectionHandler fakeOpenConnectionHandler = new ( + failingIndexes: new HashSet(), + manualResetEvent: manualResetEvent); + + HttpClient httpClient = new (new HttpHandlerHelper(mockHttpHandler.Object)); + GatewayAddressCache cache = new ( + new Uri(GatewayAddressCacheTests.DatabaseAccountApiEndpoint), + Documents.Client.Protocol.Tcp, + this.mockTokenProvider.Object, + this.mockServiceConfigReader.Object, + MockCosmosUtil.CreateCosmosHttpClient(() => httpClient), + openConnectionsHandler: fakeOpenConnectionHandler, + suboptimalPartitionForceRefreshIntervalInSeconds: 2, + enableTcpConnectionEndpointRediscovery: true); + + // By default, the replica validation feature is disabled in GatewayAddressCache. Reflection is used to enable the feature + // for the purpose of this test. + FieldInfo fieldInfo = cache + .GetType() + .GetField( + name: "isReplicaAddressValidationEnabled", + bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic); + + fieldInfo.SetValue( + obj: cache, + value: true); + + DocumentServiceRequest request = DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid); + + // Act and Assert. + PartitionAddressInformation addressInfo = await cache.TryGetAddressesAsync( + request: request, + partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity, + serviceIdentity: this.serviceIdentity, + forceRefreshPartitionAddresses: false, + cancellationToken: CancellationToken.None); + + TransportAddressUri refreshedUri = addressInfo + .Get(Documents.Client.Protocol.Tcp)? + .ReplicaTransportAddressUris + .Single(x => x.ToString().Equals(addressTobeMarkedUnhealthy)); + + Assert.IsNotNull(refreshedUri); + Assert.AreEqual( + expected: TransportAddressHealthState.HealthStatus.Unknown, + actual: refreshedUri.GetCurrentHealthState().GetHealthStatus()); + + Assert.AreEqual(4, addressInfo.AllAddresses.Count); + Assert.AreEqual(1, addressInfo.AllAddresses.Count(x => x.PhysicalUri == oldAddress)); + Assert.AreEqual(0, addressInfo.AllAddresses.Count(x => x.PhysicalUri == newAddress)); + + // Because force refresh is requested, an unhealthy replica is added to the failed endpoint so that it's status could be validted. + request.RequestContext.FailedEndpoints.Value.Add( + new TransportAddressUri( + addressUri: new Uri( + uriString: addressTobeMarkedUnhealthy))); + + addressInfo = await cache.TryGetAddressesAsync( + request: request, + partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity, + serviceIdentity: this.serviceIdentity, + forceRefreshPartitionAddresses: true, + cancellationToken: CancellationToken.None); + + Assert.AreEqual(4, addressInfo.AllAddresses.Count); + Assert.AreEqual(0, addressInfo.AllAddresses.Count(x => x.PhysicalUri == oldAddress)); + Assert.AreEqual(1, addressInfo.AllAddresses.Count(x => x.PhysicalUri == newAddress)); + + // Waits until a completion signal from the background task is received. + GatewayAddressCacheTests.WaitForManualResetEventSignal( + manualResetEvent: manualResetEvent, + shouldReset: false); + + refreshedUri = addressInfo + .Get(Documents.Client.Protocol.Tcp)? + .ReplicaTransportAddressUris + .Single(x => x.ToString().Equals(addressTobeMarkedUnhealthy)); + + Assert.IsNotNull(refreshedUri); + Assert.AreEqual( + expected: TransportAddressHealthState.HealthStatus.Connected, + actual: refreshedUri.GetCurrentHealthState().GetHealthStatus()); + + mockHttpHandler.VerifyAll(); + GatewayAddressCacheTests.AssertOpenConnectionHandlerAttributes( + fakeOpenConnectionHandler: fakeOpenConnectionHandler, + expectedExceptionCount: 0, + expectedMethodInvocationCount: 1, + expectedReceivedAddressesCount: 1, + expectedSuccessCount: 1); + } + + /// + /// Test to validate that when replica validation is enabled and there exists a replica such that it remained unhealthy for a period of one minute or + /// more, then even though a force refresh is not requested, the unhealthy replicas at least get a chance to re-validate it's status by the + /// on-demand async non-blocking cache refresh flow and eventually marks itself as healthy once the open connection attempt is successful. + /// + [TestMethod] + [Owner("dkunda")] + public async Task TryGetAddressesAsync_WhenReplicaVlidationEnabledAndUnhealthyUriExistsForOneMinute_ShouldForceRefreshUnhealthyReplicas() + { + // Arrange. + ManualResetEvent manualResetEvent = new (initialState: false); + Mock mockHttpHandler = new(MockBehavior.Strict); + string oldAddress = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/4s"; + string newAddress = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/5s"; + string addressTobeMarkedUnhealthy = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/2s"; + mockHttpHandler.SetupSequence(x => x.SendAsync( + It.IsAny(), + It.IsAny())) + .Returns(MockCosmosUtil.CreateHttpResponseOfAddresses(new List() + { + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/1p", + addressTobeMarkedUnhealthy, + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/3s", + oldAddress, + })) + .Returns(MockCosmosUtil.CreateHttpResponseOfAddresses(new List() + { + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/1p", + addressTobeMarkedUnhealthy, + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/3s", + newAddress, + })) + .Returns(MockCosmosUtil.CreateHttpResponseOfAddresses(new List() + { + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/1p", + addressTobeMarkedUnhealthy, + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/3s", + newAddress, + })); + + FakeOpenConnectionHandler fakeOpenConnectionHandler = new ( + failIndexesByAttempts: new Dictionary>() + { + { 0, new HashSet() { 0 } } + }, + manualResetEvent: manualResetEvent); + + HttpClient httpClient = new (new HttpHandlerHelper(mockHttpHandler.Object)); + GatewayAddressCache cache = new ( + new Uri(GatewayAddressCacheTests.DatabaseAccountApiEndpoint), + Documents.Client.Protocol.Tcp, + this.mockTokenProvider.Object, + this.mockServiceConfigReader.Object, + MockCosmosUtil.CreateCosmosHttpClient(() => httpClient), + openConnectionsHandler: fakeOpenConnectionHandler, + suboptimalPartitionForceRefreshIntervalInSeconds: 2, + enableTcpConnectionEndpointRediscovery: true); + + // By default, the replica validation feature is disabled in GatewayAddressCache. Reflection is used to enable the feature + // for the purpose of this test. + FieldInfo fieldInfo = cache + .GetType() + .GetField( + name: "isReplicaAddressValidationEnabled", + bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic); + + fieldInfo.SetValue( + obj: cache, + value: true); + + DocumentServiceRequest request = DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid); + + // Act and Assert. + PartitionAddressInformation addressInfo = await cache.TryGetAddressesAsync( + request: request, + partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity, + serviceIdentity: this.serviceIdentity, + forceRefreshPartitionAddresses: false, + cancellationToken: CancellationToken.None); + + TransportAddressUri refreshedUri = addressInfo + .Get(Documents.Client.Protocol.Tcp)? + .ReplicaTransportAddressUris + .Single(x => x.ToString().Equals(addressTobeMarkedUnhealthy)); + + Assert.IsNotNull(refreshedUri); + Assert.AreEqual( + expected: TransportAddressHealthState.HealthStatus.Unknown, + actual: refreshedUri.GetCurrentHealthState().GetHealthStatus()); + + Assert.AreEqual(4, addressInfo.AllAddresses.Count); + Assert.AreEqual(1, addressInfo.AllAddresses.Count(x => x.PhysicalUri == oldAddress)); + Assert.AreEqual(0, addressInfo.AllAddresses.Count(x => x.PhysicalUri == newAddress)); + + // Because force refresh is requested, an unhealthy replica is added to the failed endpoint so that it's health status could be validted. + request.RequestContext.FailedEndpoints.Value.Add( + new TransportAddressUri( + addressUri: new Uri( + uriString: addressTobeMarkedUnhealthy))); + + addressInfo = await cache.TryGetAddressesAsync( + request: request, + partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity, + serviceIdentity: this.serviceIdentity, + forceRefreshPartitionAddresses: true, + cancellationToken: CancellationToken.None); + + Assert.AreEqual(4, addressInfo.AllAddresses.Count); + Assert.AreEqual(0, addressInfo.AllAddresses.Count(x => x.PhysicalUri == oldAddress)); + Assert.AreEqual(1, addressInfo.AllAddresses.Count(x => x.PhysicalUri == newAddress)); + + // Waits until a completion signal from the background task is received. + GatewayAddressCacheTests.WaitForManualResetEventSignal( + manualResetEvent: manualResetEvent, + shouldReset: true); + + // During the replica validation flow, the connection attempt was not successful thus the replica + // was marked unhealthy. + refreshedUri = addressInfo + .Get(Documents.Client.Protocol.Tcp)? + .ReplicaTransportAddressUris + .Single(x => x.ToString().Equals(addressTobeMarkedUnhealthy)); + + Assert.IsNotNull(refreshedUri); + Assert.AreEqual( + expected: TransportAddressHealthState.HealthStatus.Unhealthy, + actual: refreshedUri.GetCurrentHealthState().GetHealthStatus()); + + GatewayAddressCacheTests.AssertOpenConnectionHandlerAttributes( + fakeOpenConnectionHandler: fakeOpenConnectionHandler, + expectedExceptionCount: 1, + expectedMethodInvocationCount: 1, + expectedReceivedAddressesCount: 1, + expectedSuccessCount: 0); + + // A delay of 2 minute was added to make the replica unhealthy for more than one minute. This + // will make sure the unhealthy replica gets a chance to re-validate it's health status. + ReflectionUtils.AddMinuteToDateTimeFieldUsingReflection( + objectName: refreshedUri.GetCurrentHealthState(), + fieldName: "lastUnhealthyTimestamp", + delayInMinutes: -2); + + addressInfo = await cache.TryGetAddressesAsync( + request: request, + partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity, + serviceIdentity: this.serviceIdentity, + forceRefreshPartitionAddresses: false, + cancellationToken: CancellationToken.None); + + // Waits until a completion signal from the background task is received. + GatewayAddressCacheTests.WaitForManualResetEventSignal( + manualResetEvent: manualResetEvent, + shouldReset: false); + + Assert.AreEqual(4, addressInfo.AllAddresses.Count); + GatewayAddressCacheTests.AssertOpenConnectionHandlerAttributes( + fakeOpenConnectionHandler: fakeOpenConnectionHandler, + expectedExceptionCount: 1, + expectedMethodInvocationCount: 2, + expectedReceivedAddressesCount: 1, + expectedSuccessCount: 1); + + addressInfo = await cache.TryGetAddressesAsync( + request: request, + partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity, + serviceIdentity: this.serviceIdentity, + forceRefreshPartitionAddresses: false, + cancellationToken: CancellationToken.None); + + refreshedUri = addressInfo + .Get(Documents.Client.Protocol.Tcp)? + .ReplicaTransportAddressUris + .Single(x => x.ToString().Equals(addressTobeMarkedUnhealthy)); + + // Because the open connection attempt to the unhealthy replica was successful, the replica was + // marked as healthy. + mockHttpHandler.VerifyAll(); + Assert.AreEqual(4, addressInfo.AllAddresses.Count); + Assert.IsNotNull(refreshedUri); + Assert.AreEqual( + expected: TransportAddressHealthState.HealthStatus.Connected, + actual: refreshedUri.GetCurrentHealthState().GetHealthStatus()); + + // This assertion makes sure that no additional calls were made to the open connection handler after + // since the last address refresh, because all the replicas are now either Unknown or Connected. + GatewayAddressCacheTests.AssertOpenConnectionHandlerAttributes( + fakeOpenConnectionHandler: fakeOpenConnectionHandler, + expectedExceptionCount: 1, + expectedMethodInvocationCount: 2, + expectedReceivedAddressesCount: 1, + expectedSuccessCount: 1); + } + + /// + /// Test to validate that when replica validation is disabled and there exists some unhealthy replicas, the gateway address + /// cache doesn't validate the health state of the unhealthy replicas. + /// + [TestMethod] + [Owner("dkunda")] + public async Task TryGetAddressesAsync_WhenReplicaVlidationDisabled_ShouldNotValidateUnhealthyReplicas() + { + // Arrange. + Mock mockHttpHandler = new(MockBehavior.Strict); + string oldAddress = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/4s"; + string newAddress = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/5s"; + string addressTobeMarkedUnhealthy = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/2s"; + mockHttpHandler.SetupSequence(x => x.SendAsync( + It.IsAny(), + It.IsAny())) + .Returns(MockCosmosUtil.CreateHttpResponseOfAddresses(new List() + { + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/1p", + addressTobeMarkedUnhealthy, + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/3s", + oldAddress, + })) + .Returns(MockCosmosUtil.CreateHttpResponseOfAddresses(new List() + { + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/1p", + addressTobeMarkedUnhealthy, + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/3s", + newAddress, + })); + + FakeOpenConnectionHandler fakeOpenConnectionHandler = new(failingIndexes: new HashSet()); + HttpClient httpClient = new(new HttpHandlerHelper(mockHttpHandler.Object)); + GatewayAddressCache cache = new( + new Uri(GatewayAddressCacheTests.DatabaseAccountApiEndpoint), + Documents.Client.Protocol.Tcp, + this.mockTokenProvider.Object, + this.mockServiceConfigReader.Object, + MockCosmosUtil.CreateCosmosHttpClient(() => httpClient), + openConnectionsHandler: fakeOpenConnectionHandler, + suboptimalPartitionForceRefreshIntervalInSeconds: 2, + enableTcpConnectionEndpointRediscovery: true); + + DocumentServiceRequest request = DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid); + + // Act and Assert. + PartitionAddressInformation addressInfo = await cache.TryGetAddressesAsync( + request: request, + partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity, + serviceIdentity: this.serviceIdentity, + forceRefreshPartitionAddresses: false, + cancellationToken: CancellationToken.None); + + Assert.AreEqual(4, addressInfo.AllAddresses.Count); + Assert.AreEqual(1, addressInfo.AllAddresses.Count(x => x.PhysicalUri == oldAddress)); + Assert.AreEqual(0, addressInfo.AllAddresses.Count(x => x.PhysicalUri == newAddress)); + + // Because force refresh is requested, an unhealthy replica is added to the failed endpoint so that it's status could be validted. + request.RequestContext.FailedEndpoints.Value.Add( + new TransportAddressUri( + addressUri: new Uri( + uriString: addressTobeMarkedUnhealthy))); + + addressInfo = await cache.TryGetAddressesAsync( + request: request, + partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity, + serviceIdentity: this.serviceIdentity, + forceRefreshPartitionAddresses: true, + cancellationToken: CancellationToken.None); + + Assert.AreEqual(4, addressInfo.AllAddresses.Count); + Assert.AreEqual(0, addressInfo.AllAddresses.Count(x => x.PhysicalUri == oldAddress)); + Assert.AreEqual(1, addressInfo.AllAddresses.Count(x => x.PhysicalUri == newAddress)); + + TransportAddressUri refreshedUri = addressInfo + .Get(Documents.Client.Protocol.Tcp)? + .ReplicaTransportAddressUris + .Single(x => x.ToString().Equals(addressTobeMarkedUnhealthy)); + + Assert.IsNotNull(refreshedUri); + Assert.AreEqual( + expected: TransportAddressHealthState.HealthStatus.Unknown, + actual: refreshedUri.GetCurrentHealthState().GetHealthStatus()); + + mockHttpHandler.VerifyAll(); + GatewayAddressCacheTests.AssertOpenConnectionHandlerAttributes( + fakeOpenConnectionHandler: fakeOpenConnectionHandler, + expectedExceptionCount: 0, + expectedMethodInvocationCount: 0, + expectedReceivedAddressesCount: 0, + expectedSuccessCount: 0); + } + + /// + /// Blocks the current thread until a completion signal on the ManualResetEvent + /// is received. A timeout of 5 seconds is added to avoid any thread starvation. + /// + /// An instance of . + /// A boolean flag indicating if a Reset on the manualResetEvent is required. + private static void WaitForManualResetEventSignal( + ManualResetEvent manualResetEvent, + bool shouldReset) + { + manualResetEvent.WaitOne( + millisecondsTimeout: 5000); + + if (shouldReset) + { + manualResetEvent.Reset(); + } + } + /// /// Helper method to assert on the class attributes /// to match with that of the expected ones. diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Routing/AsyncCacheNonBlockingTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Routing/AsyncCacheNonBlockingTests.cs index d5250b040..d58ab3db5 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Routing/AsyncCacheNonBlockingTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Routing/AsyncCacheNonBlockingTests.cs @@ -392,5 +392,150 @@ namespace Microsoft.Azure.Cosmos.Tests.Routing Assert.AreEqual(1, totalLazyCalls); Assert.AreEqual("Test3", result); } + + /// + /// Test to validate that when RefreshAsync() is invoked for a valid existing key, the + /// cache refreshes the key successfully and the new value is updated in the cache. + /// + [TestMethod] + [Owner("dkunda")] + public async Task RefreshAsync_WhenRefreshRequestedForAnExistingKey_ShouldRefreshTheCache() + { + // Arrange. + AsyncCacheNonBlocking asyncCache = new (); + + // Act and Assert. + string result = await asyncCache.GetAsync( + "key", + (_) => Task.FromResult("value1"), + (_) => false); + + Assert.AreEqual("value1", result); + + await asyncCache.RefreshAsync( + "key", + (_) => Task.FromResult("value2")); + + result = await asyncCache.GetAsync( + "key", + (_) => throw new Exception("Should not refresh."), + (_) => false); + + Assert.AreEqual("value2", result); + } + + /// + /// Test to validate that when a DocumentClientException is thrown during RefreshAsync() operation, + /// then the cache removes the key for which a refresh was requested. + /// + [TestMethod] + [Owner("dkunda")] + public async Task RefreshAsync_WhenThrowsDocumentClientException_ShouldRemoveKeyFromTheCache() + { + // Arrange. + AsyncCacheNonBlocking asyncCache = new (); + + // Act and Assert. + string result = await asyncCache.GetAsync( + "key", + (_) => Task.FromResult("value1"), + (_) => false); + + Assert.AreEqual("value1", result); + + result = await asyncCache.GetAsync( + "key", + (_) => Task.FromResult("value2"), + (_) => false); + + // Because the key is already present in the cache and a force refresh was not requested + // the func delegate should not get invoked and thus the cache should not be updated + // and still return the old cached value. + Assert.AreEqual("value1", result); + + NotFoundException notFoundException = new ( + message: "Item was deleted."); + try + { + await asyncCache.RefreshAsync( + "key", + async (_) => + { + await Task.Delay(TimeSpan.FromMilliseconds(5)); + throw notFoundException; + }); + Assert.Fail("Should throw a NotFoundException"); + } + catch (NotFoundException exception) + { + Assert.AreEqual(notFoundException, exception); + } + + // Because the key was deleted from the cache, the func delegate should get invoked at + // this point and update the value to value2. + result = await asyncCache.GetAsync( + "key", + (_) => Task.FromResult("value2"), + (_) => false); + + Assert.AreEqual("value2", result); + } + + /// + /// Test to validate that when some other Exception is thrown during RefreshAsync() operation, + /// then the cache does not remove the key for which the refresh was originally requested. + /// + [TestMethod] + [Owner("dkunda")] + public async Task RefreshAsync_WhenThrowsOtherException_ShouldNotRemoveKeyFromTheCache() + { + // Arrange. + AsyncCacheNonBlocking asyncCache = new(); + + // Act and Assert. + string result = await asyncCache.GetAsync( + "key", + (_) => Task.FromResult("value1"), + (_) => false); + + Assert.AreEqual("value1", result); + + result = await asyncCache.GetAsync( + "key", + (_) => Task.FromResult("value2"), + (_) => false); + + // Because the key is already present in the cache and a force refresh was not requested + // the func delegate should not get invoked and thus the cache should not be updated + // and still return the old cached value. + Assert.AreEqual("value1", result); + + Exception exception = new( + message: "Timeout exception."); + try + { + await asyncCache.RefreshAsync( + "key", + async (_) => + { + await Task.Delay(TimeSpan.FromMilliseconds(5)); + throw exception; + }); + Assert.Fail("Should throw a NotFoundException"); + } + catch (Exception ex) + { + Assert.AreEqual(ex, exception); + } + + // Because the key should not get deleted from the cache, the func delegate should not get invoked at + // this point. + result = await asyncCache.GetAsync( + "key", + (_) => Task.FromResult("value2"), + (_) => false); + + Assert.AreEqual("value1", result); + } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Utils/ReflectionUtils.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Utils/ReflectionUtils.cs new file mode 100644 index 000000000..78bf6fafb --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Utils/ReflectionUtils.cs @@ -0,0 +1,43 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Tests +{ + using System; + using System.Reflection; + + /// + /// Common utility class for reflaction related operations. + /// + internal static class ReflectionUtils + { + /// + /// This helper method uses reflection to set the private and read only fields + /// to the disered values to help the test cases mimic the expected behavior. + /// + /// An object where reflection will be applied to update the field. + /// A string containing the internal field name. + /// An integer to add or substract the desired delay in minutes. + internal static void AddMinuteToDateTimeFieldUsingReflection( + object objectName, + string fieldName, + int delayInMinutes) + { + FieldInfo fieldInfo = objectName + .GetType() + .GetField( + name: fieldName, + bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic); + + DateTime? fieldValue = (DateTime?)fieldInfo + .GetValue( + obj: objectName); + + fieldInfo + .SetValue( + obj: objectName, + value: ((DateTime)fieldValue).AddMinutes(delayInMinutes)); + } + } +} diff --git a/docs/ReplicaValidationDesign.md b/docs/ReplicaValidationDesign.md new file mode 100644 index 000000000..0e23db09d --- /dev/null +++ b/docs/ReplicaValidationDesign.md @@ -0,0 +1,338 @@ +# Design Approach to Validate Backend Replicas During Service Upgrade in Direct Mode + +## Table of Contents + +* [Scope.](#scope) +* [Backgraound.](#backgraound) +* [Proposed Solution.](#proposed-solution) +* [Design Approach.](#design-approach) + * [Outline.](#outline) + * [Updated Sequence Diagram for `CosmosClient` initialization.](#updated-sequence-diagram-for-cosmosclient-initialization) + * [Sequence Diagram when `StoreReader` invokes the `GatewayAddressCache` to resolve addresses and leverages `AddressEnumerator` to enumerate the transport addresses.](#sequence-diagram-when-storereader-invokes-the-gatewayaddresscache-to-resolve-addresses-and-leverages-addressenumerator-to-enumerate-the-transport-addresses) + * [State Diagram to Understand the `TransportAddressUri` Health State Transformations.](#state-diagram-to-understand-the-transportaddressuri-health-state-transformations) + * [`Microsoft.Azure.Cosmos.Direct` package class diagrams.](#azurecosmosdirect-package-class-diagrams) + * [`Microsoft.Azure.Cosmos` package class diagrams.](#microsoftazurecosmos-package-class-diagrams) +* [Pull Request with Sample Code Changes.](#pull-request-with-sample-code-changes) +* [References.](#references) + +## Scope + +The scope of the replica validation workstream is targed for the `CosmosClient` configured for `Direct` mode. + +## Backgraund + +During an upgrade scenario in the backend replica nodes, there has been an observation of increased request latency. One of the primary reason for the latency is that, during an upgrade, a replica which is still undergoing upgrade may still be returned back to SDK, when an address refresh occurres. As of today, the incoming request will have `25%` chance to hit the replica that not ready yet, therefore causing the `ConnectionTimeoutException`, which contributes to the increased latency. + +To understand the problem statement better, please take a look at the below sequence diagram which reflects the connection timeouts caused by the replica upgrade. + +```mermaid +sequenceDiagram + autonumber + participant A as StoreReader
[Direct Code] + participant B as GlobalAddressResolver
[v3 Code] + participant C as GatewayAddressCache
[v3 Code] + participant D as GatewayService
[External Service] + participant E as BackendReplica
[A Replica Node Still Undergoing Upgrade
Address: rntbd://test.azure.com:443/partitions/2s] + A->>+B: Request (forceRefresh - false) + B->>+C: TryGetAddresses (forceRefresh - false) + C->>-B: Fetch Cached Addresses
rntbd://test.azure.com:443/partitions/2s + B->>-A: Return Addresses
rntbd://test.azure.com:443/partitions/2s + A->>+E: Request Sent to Backend Replica + E-x-A: Request fails with 410 GoneException + A->>+B: Request (forceRefresh - true)
GoneWithRetryAttempt + B->>+C: TryGetAddresses (forceRefresh - true) + C->>+D: GetServerAddresses + D->>-C: Returns the new refreshed addresses
rntbd://test.azure.com:443/partitions/2s + Note over D: Note that the returned addresses from
GatewayService may still undergoing
the upgrade, thus and they are not in a ready state. + C->>-B: Returns the refreshed addresses
rntbd://test.azure.com:443/partitions/2s + B->>-A: Returns the refreshed addresses
rntbd://test.azure.com:443/partitions/2s + A->>+E: Request Sent to Backend Replica + E-x-A: Request fails again with 410 GoneException + Note over A: Note that the request fails to connect to the replica
which causes a "ConnectionTimeoutException". +``` + +## Proposed Solution + +The .NET SDK will track the replica endpoint health based on client side metrics, and de-prioritize any replica which were marked as - `Unhealthy`. SDK will validate the health of the replica by attempting to open RNTBD connections to the backend. When SDK refresh addresses back from gateway for a partition, **SDK will only validate the replica/s which were in `Unhealthy` status, by opening RNTBD connection requests**. This process will be completed with best effort, which means: + +- If the validation can not finish within `1 min` of opening connections, the de-prioritize will stop for certain status. + +- The selection of the replica will not be blocked by the validation process. To better understand this - if a request needs to be sent to `N` replicas, and if there is only `N-1` replica in good status, it will still go ahead selecting the `Nth` replica which needs validation. + +- It is opt in only for now, by setting the environment variable `AZURE_COSMOS_REPLICA_VALIDATION_ENABLED` to `true`. + +## Design Approach + +### Outline + +The basic idea for this design approach has been divited into *three* parts, which has been mentioned below in detail: + +- Maintain `4` new health statuses into the `TransportAddressUri`, which are : + + - **`Connected`** (Indicates that there is already a connection made successfully to the backend replica) + - **`Unknown`** (Indicates that the connection exists however the status is unknown at the moment) + - **`Unhealthy Pending`** (Indicates that the connection was unhealthy previously, but an attempt will be made to validate the replica to check the current status) + - **`Unhealthy`** (Indicates that the connection is unhealthy at the moment) + +- Validate the `Unhealthy` replicas returned from the Address Cache, by attempting to open the RNTBD connection. Note that the validation task and the connection opening part has been done as a background task, so that the address resolving doesn't wait on the RNTBD context negotiation to finish. + +- Leverage the `AddressEnumerator` to reorder the replicas by their health statuses. For instance, if the replica validation is enabled, the `AddressEnumerator` will reorder the replicas by sorting them in the order of **Connected/ Unknown** > **Unhealthy Pending** > **Unhealthy**. However, if the replica validation is disabled, the replicas will be sorted in the order of **Connected/ Unknown/ Unhealthy Pending** > **Unhealthy**. + +Prior discussing the replica validation, it is very important to understand the changes in the flow while opening the RNTBD connections to the backend replica nodes, during the `CosmosClient` initialization. The changes in the flow are mentioned below as an updated sequence diagram. + +### Updated Sequence Diagram for `CosmosClient` initialization. + +```mermaid +sequenceDiagram + participant A as CosmosClient
[v3 Code] + participant B as ClientContextCore
[v3 Code] + participant C as DocumentClient
[v3 Code] + participant D as ServerStoreModel
[Direct Code] + participant K as StoreClientFactory
[Direct Code] + participant E as StoreClient
[Direct Code] + participant F as ReplicatedResourceClient
[Direct Code] + participant G as GlobalAddressResolver
[v3 Code] + participant H as GatewayAddressCache
[v3 Code] + participant J as RntbdOpenConnectionHandler
[Direct Code] + participant I as TransportClient
[Direct Code] + A->>B: 1. InitializeContainerWithRntbdAsync() + B->>C: 2. OpenConnectionsAsync() + C->>C: 2.1. CreateStoreModel() + C->>K: 3. CreateStoreClient(addressResolver) + K->>E: 4. new StoreClient(addressResolver) + E->>G: 5. SetOpenConnectionsHandler(new RntbdOpenConnectionHandler(transportClient)) + Note over E: a) Creates a new instance of RntbdOpenConnectionHandler
and sets it to the IAddressResolverExtension.
Note that the GlobalAddressResolver implements the
IAddressResolverExtension today.
b) Sets the IAddressResolverExtension to the Replicated
ResourceClient. + G->>H: 6. SetOpenConnectionsHandler(openConnectionHandler) + C->>D: 7. OpenConnectionsToAllReplicasAsync() + D->>E: 8. OpenConnectionsToAllReplicasAsync() + E->>F: 9. OpenConnectionsToAllReplicasAsync() + F->>G: 10. OpenConnectionsToAllReplicasAsync() + G->>G: 10.1 collection = collectionCache.
ResolveByNameAsync() + G->>G: 10.2 partitionKeyRanges = routingMapProvider.
TryGetOverlappingRangesAsync(FullRange) + G->>H: 11. OpenAsync
(partitionKeyRangeIdentities) + Note over G: Resolves the collection by the
container link url and fetches
the partition key full ranges. + H->>H: 11.1 GetServerAddressesViaGatewayAsync() + H->>J: 12. OpenRntbdChannelsAsync() + Note over H: Gets the transport address uris from address info
and invokes the RntbdOpenConnectionHandler
OpenRntbdChannelsAsync() method with the transport uris
to establish the Rntbd connection. + J->>I: 13. OpenConnectionAsync()
using the resolved transport address uris. +``` + +Now that we are well aware of the changes in the RNTBD open connection flow, let's leverage the changes in the replica validation flow. The below sequence diagram describes the request and response flow from `StoreReader` (present in the `Cosmos.Direct` namespace) to the `GatewayAddressCache` (present in the `Microsoft.Azure.Cosmos` namespace), during the read request to backend replica/s. + +### Sequence Diagram when `StoreReader` invokes the `GatewayAddressCache` to resolve addresses and leverages `AddressEnumerator` to enumerate the transport addresses. + +```mermaid +sequenceDiagram + participant A as StoreReader
[Direct Code] + participant B as AddressSelector
[v3 Code] + participant C as GlobalAddressResolver
[v3 Code] + participant D as AddressResolver
[v3 Code] + participant E as GatewayAddressCache
[v3 Code] + participant F as RntbdOpenConnectionHandler
[Direct Code] + participant G as AsyncCacheNonBlocking
[v3 Code] + participant H as GatewayService
[external service] + participant I as AddressEnumerator
[Direct Code] + participant J as TransportClient
[Direct Code] + participant K as Channel
[Direct Code] + participant L as TransportAddressUri
[Direct Code] + A->>A: 1. ReadMultipleReplicaAsync() + A->>B: 2. ResolveAllTransportAddressUriAsync(forceRefresh - false) + B->>C: 3. ResolveAsync(forceRefresh - false) + C->>D: 4. ResolveAsync(forceRefresh - false) + D->>D: 5. ResolveAddressesAndIdentityAsync() + D->>E: 6. TryGetAddressesAsync(forceRefresh - false) + E->>G: 7. GetAsync ("singleValueInitFunc delegate", "forceRefresh - false") + Note over L: Initial health status of a
TransportAddressUri is "Unknown". + Note over E: Passes the SingleValueInitFunc delegate
to async nonblocking cache. + G->>E: 8. Returns the cached addresses + E->>D: 9. Returns the resolved addresses + D->>C: 10. Returns the resolved addresses + C->>B: 11. Returns the resolved addresses + B->>A: 12. Returns the resolved addresses + A->>A: 13. Request failes with "GoneException" + Note over A: Sets Force Refresh
header to true. + A->>A: 14. ReadMultipleReplicaAsync() + A->>B: 15. ResolveAllTransportAddressUriAsync (forceRefresh - true) + B->>C: 16. ResolveAsync(forceRefresh - true) + C->>D: 17. ResolveAsync(forceRefresh - true) + D->>D: 18. ResolveAddressesAndIdentityAsync(forceRefresh - true) + D->>E: 19. TryGetAddressesAsync(forceRefresh - true) + E->>G: 20. GetAsync ("singleValueInitFunc delegate", "forceRefresh - true") + Note over E: Passes the SingleValueInitFunc delegate
to async nonblocking cache. + G->>E: 21. Invokes the singleValueInitFunc delegate + E->>E: 22. SetTransportAddressUrisToUnhealthy(currentCachedValue, failedEndpoints) + Note over E: Sets the failed TransportAddressUris
to an "Unhealthy" status. + E->>L: 23. SetUnhealthy() + E->>E: 24. GetAddressesForRangeIdAsync(forceRefresh - true, cachedAddresses) + E->>H: 25. Invokes the GatewayService using GetServerAddressesViaGatewayAsync()
to receive new addresses. + H->>E: 26. Receives the resolved new addresses. + E->>E: 27. MergeAddresses + Note over E: The purpose of the merge is to restore the health statuses of all the new addresses to
that of their recpective cached addresses, if returned same addresses. + E->>L: 28. SetRefreshedIfUnhealthy() + Note over E: Sets any TransportAddressUri with
an "Unhealthy" status to "UnhealthyPending". + E->>E: 29. ValidateReplicaAddresses(mergedTransportAddress) + Note over E: Validates the backend replicas,
if the replica validation env variable is enabled. + E-->>F: 30. OpenRntbdChannelsAsync(mergedTransportAddress) + Note over E: If replica validation is enabled, then validate unhealthy pending
replicas by opening RNTBD connections. Note that this
operations executes as a background task. + F-->>J: 31. OpenConnectionAsync()
using the resolved transport address uris + J-->>K: 32. OpenConnectionAsync()
using the resolved physical address uris + K-->>L: 33. SetConnected() + Note over K: Initializes and Establishes a RNTBD
context negotiation to the backend replica nodes. + E->>G: 34. Returns the merged addresses to cache and store into the Async Nonblocking Cache + G->>E: 35. Returns the resolved addresses + E->>E: 36. ShouldRefreshHealthStatus() + Note over E: Refresh the cache if there was an address
has been marked as unhealthy long enough (more than a minute)
and need to revalidate its status. + E-->>G: 37. Refresh ("GetAddressesForRangeIdAsync() as the singleValueInitFunc delegate", "forceRefresh - true") + Note over E: Note that the refresh operation
happens as a background task. + E->>D: 38. Returns the resolved addresses + D->>C: 39. Returns the resolved addresses + C->>B: 40. Returns the resolved addresses + B->>A: 41. Returns the resolved transport addresses + A->>I: 42. GetTransportAddresses("transportAddressUris", "replicaAddressValidationEnabled") + I->>I: 43. ReorderReplicasByHealthStatus() + Note over I: Re-orders the transport addresses
by their health statuses
Connected/Unknown >> UnhealthyPending >> Unhealthy. + I->>A: 44. Returns the transport addresses re-ordered by their health statuses +``` +### State Diagram to Understand the `TransportAddressUri` Health State Transformations. + +To better understand the design, it is critical to understand the `TransportAddressUri` health state transformations. The below state diagram depicts the `TransportAddressUri` state transitions in detail. + +```mermaid + stateDiagram-v2 + [*] --> Unknown + note right of Unknown + Initial state of
a TransportAddressUri + end note + Unknown --> Connected: Channel Aquire
Successful + Unknown --> Unhealthy: Channel Aquire
Failed + Unhealthy --> Connected: Channel Aquire
Successful after 1 Min + Unhealthy --> UnhealthyPending: Refresh Addresses
from Gateway when
Replica Validation
is enabled + UnhealthyPending --> Unhealthy: RntbdOpenConnectionHandler -
Channel Aquire
Failed + UnhealthyPending --> Connected: RntbdOpenConnectionHandler -
Channel Aquire
Successful + Connected --> Unhealthy: Request failed with 410
GoneException and
force refresh + Connected --> [*] +``` + +To accomplish the above changes in the replica validation flow, below are the class diagrams and the proposed code changes in both `Microsoft.Azure.Cosmos.Direct` and `Microsoft.Azure.Cosmos` packages. + +### `Microsoft.Azure.Cosmos.Direct` package class diagrams. + +Introduce a new `IOpenConnectionsHandler` interface with `OpenRntbdChannelsAsync()` method. Create a new class `RntbdOpenConnectionHandler` that will eventually implement the `IOpenConnectionsHandler` interface and override the `OpenRntbdChannelsAsync()` method to establish Rntbd connections to the transport address uris. Note that, this class will also add the concurrency control logic, so that any burst of connection creation could be avoided. The below class diagram depicts the same behavior. + +```mermaid +classDiagram + IOpenConnectionsHandler --|> RntbdOpenConnectionHandler : implements + <> IOpenConnectionsHandler + IOpenConnectionsHandler: +OpenRntbdChannelsAsync(IReadOnlyList~TransportAddressUri~ addresses) + class RntbdOpenConnectionHandler{ + -TransportClient transportClient + -SemaphoreSlim semaphore + -int SemaphoreAcquireTimeoutInMillis + +OpenRntbdChannelsAsync() + } +``` + +Extend the `IAddressResolverExtension` interface with `SetOpenConnectionsHandler()` method. The benefits and the utilizations are provided below: + +- The `GlobalAddressResolver` can then implement the `SetOpenConnectionsHandler()` method, which will be invoked by the `StoreClient` constructor to set the `RntbdOpenConnectionHandler`. + +- The `OpenConnectionsAsync()` method present inside `GlobalAddressResolver`, will be invoked from the `ReplicatedResourceClient` eventually. The `AddressResolver` class can simply implement the method/s and return an empty `Task`. The `GlobalAddressResolver`.`OpenConnectionsAsync()` is responsible for + + - Resolving the collection by the database name and container link, + - Fetching the partition key full ranges and + - Invoking the `GatewayAddressCache` for the preferred region, with the `RntbdOpenConnectionHandler` instance, passed by the `StoreClient`. + +The below class diagram depicts the same behavior. + +*[Note: The `IAddressResolverExtension` was introduced to hold the new methods, which could break the existing build, if put directly into `IAddressResolver` interface. The `IAddressResolverExtension` will be removed eventually and the existing methods will be moved into `IAddressResolver` interface.]* + +```mermaid +classDiagram + IAddressResolver --|> IAddressResolverExtension : extends + IAddressResolverExtension --|> GlobalAddressResolver : implements + <> IAddressResolver + <> IAddressResolverExtension + IAddressResolverExtension: +OpenConnectionsAsync(string databaseName, string containerLinkUri) + IAddressResolverExtension: +SetOpenConnectionsHandler(IOpenConnectionsHandler openConnectionHandler) + class GlobalAddressResolver{ + +OpenConnectionsAsync() + +SetOpenConnectionsHandler() + } +``` + +Update the method definition of `GetTransportAddresses()` present in `IAddressEnumerator` to add a new `boolean` argument `replicaAddressValidationEnabled`. This will help to choose the correct set of replica/s when replica validation is enabled or disabled. Additionally, a new private method `ReorderReplicasByHealthStatus()` will be added in `AddressEnumerator` to re-order the transport uri/s by their health status priority. The below class diagram depicts the same changes. + +```mermaid +classDiagram + IAddressEnumerator --|> AddressEnumerator : implements + <> IAddressEnumerator + IAddressEnumerator: +GetTransportAddresses(IReadOnlyList~TransportAddressUri~ transportAddressUris, Lazy~HashSet~TransportAddressUri~~ failedEndpoints, bool replicaAddressValidationEnabled) IEnumerable~TransportAddressUri~ + class AddressEnumerator{ + +GetTransportAddresses() + -ReorderReplicasByHealthStatus(IReadOnlyList~TransportAddressUri~ transportAddressUris, Lazy~HashSet~TransportAddressUri~~ failedEndpoints, bool replicaAddressValidationEnabled) IEnumerable~TransportAddressUri~ + } +``` + +This class will be updated eventually, with critical set of changes required for the replica validation workstream. It will introduce an enum `HealthStatus` with `4` new health statuses - `Connected`, `Unknown`, `UnhealthyPending` and `Unhealthy` to re-order the replicas with their status priorities. The setters will help to capture the correct health state of the `TransportAddressUri` at any given point of time. The below class diagram depicts the same behavior. + +```mermaid +classDiagram + class TransportAddressUri { + -DateTime? lastUnknownTimestamp + -DateTime? lastUnhealthyPendingTimestamp + -DateTime? lastUnhealthyTimestamp + -HealthStatus healthStatus + -ReaderWriterLockSlim healthStatusLock + +SetConnected() + +SetRefreshedIfUnhealthy() + +SetHealthStatus(HealthStatus status, bool forceSet) + +GetHealthStatus() HealthStatus + +GetEffectiveHealthStatus() HealthStatus + +ShouldRefreshHealthStatus() bool + } + + class HealthStatus { + <> + Connected : 100 + Unknown : 200 + UnhealthyPending : 300 + Unhealthy : 400 + } +``` + +### `Microsoft.Azure.Cosmos` package class diagrams. + +Extend the `IAddressCache` interface with `SetOpenConnectionsHandler()` method. This method should take an instance of the `RntbdOpenConnectionHandler` (which implements the `IOpenConnectionsHandler`) and sets it to a private field for later usage, to establish the Rntbd connectivity to the backend replica nodes. + +```mermaid +classDiagram + IAddressCache --|> GatewayAddressCache : implements + <> IAddressCache + IAddressCache: +SetOpenConnectionsHandler(IOpenConnectionsHandler openConnectionsHandler) + class GatewayAddressCache { + -IOpenConnectionsHandler openConnectionsHandler + -string replicaValidationVariableName + +SetOpenConnectionsHandler(IOpenConnectionsHandler openConnectionsHandler) + -MergeAddresses(PartitionAddressInformation newAddresses, PartitionAddressInformation cachedAddresses) PartitionAddressInformation + -ValidateReplicaAddresses(IReadOnlyList~TransportAddressUri~ addresses) + } +``` + +Add a new method `Refresh()` into the `AsyncCacheNonBlocking` to force refresh the address cache on demand. Note that `Refresh()` is used to force refresh any `Unhealthy` replica nodes, which has been in `Unhealthy` state for more than `1` minute. That way, any `Unhealthy` replica nodes will be back into the validation pool, which will eventually be useful to avoid any skewed replica selection. + +```mermaid +classDiagram + class AsyncCacheNonBlocking~TKey, TValue~ { + +Task Refresh(TKey key, Func~TValue, Task~TValue~~ singleValueInitFunc) + } +``` + +## Pull Request with Sample Code Changes + +Here is the [link to a sample PR](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/3570) which provides an overview of the incoming changes. + +## References + +- [Mermaid Documentation.](https://mermaid-js.github.io/mermaid/#/) +- [Upgrade Resiliency Tasks List.](https://github.com/Azure/azure-cosmos-dotnet-v3/issues/3409) +- [Design Document to Utilize RNTBD Context Negotiation During `CosmosClient` Initialization.](https://github.com/Azure/azure-cosmos-dotnet-v3/issues/3442) \ No newline at end of file