[Internal] Upgrade Resiliency: Adds Code to Enable Replica Validation Feature for Preview (#3951)
* Code changes to add replica validation feature in cosmos client options. * Code changes to upgrade the cosmos direct version to 3.31.3. * Adding emulator test to cover replica validation. * Code changes to address cosmetic clean ups. * Code changes to address review comments. Fixed preview build failures. * Code changes to enable replica validation for preview package by default. * Code changes to address review comments. * Code changes to fix preview unit tests. * Code changes to disable environment variable at the end of the test.
This commit is contained in:
Родитель
a25730a77a
Коммит
17f1b4eef8
|
@ -3,7 +3,7 @@
|
|||
<ClientOfficialVersion>3.35.1</ClientOfficialVersion>
|
||||
<ClientPreviewVersion>3.35.1</ClientPreviewVersion>
|
||||
<ClientPreviewSuffixVersion>preview</ClientPreviewSuffixVersion>
|
||||
<DirectVersion>3.31.2</DirectVersion>
|
||||
<DirectVersion>3.31.3</DirectVersion>
|
||||
<EncryptionOfficialVersion>2.0.2</EncryptionOfficialVersion>
|
||||
<EncryptionPreviewVersion>2.0.2</EncryptionPreviewVersion>
|
||||
<EncryptionPreviewSuffixVersion>preview</EncryptionPreviewSuffixVersion>
|
||||
|
|
|
@ -113,6 +113,8 @@ namespace Microsoft.Azure.Cosmos
|
|||
private const string DefaultInitTaskKey = "InitTaskKey";
|
||||
|
||||
private readonly bool IsLocalQuorumConsistency = false;
|
||||
private readonly bool isReplicaAddressValidationEnabled;
|
||||
|
||||
//Auth
|
||||
internal readonly AuthorizationTokenProvider cosmosAuthorization;
|
||||
|
||||
|
@ -231,7 +233,7 @@ namespace Microsoft.Azure.Cosmos
|
|||
|
||||
this.Initialize(serviceEndpoint, connectionPolicy, desiredConsistencyLevel);
|
||||
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(cancellationToken: this.cancellationTokenSource.Token);
|
||||
|
||||
this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -6700,7 +6702,8 @@ namespace Microsoft.Azure.Cosmos
|
|||
this.ConnectionPolicy.EnableReadRequestsFallback ?? (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.BoundedStaleness),
|
||||
!this.enableRntbdChannel,
|
||||
this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong),
|
||||
true);
|
||||
true,
|
||||
enableReplicaValidation: this.isReplicaAddressValidationEnabled);
|
||||
|
||||
if (subscribeRntbdStatus)
|
||||
{
|
||||
|
|
|
@ -66,7 +66,8 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
CosmosHttpClient httpClient,
|
||||
IOpenConnectionsHandler openConnectionsHandler,
|
||||
long suboptimalPartitionForceRefreshIntervalInSeconds = 600,
|
||||
bool enableTcpConnectionEndpointRediscovery = false)
|
||||
bool enableTcpConnectionEndpointRediscovery = false,
|
||||
bool replicaAddressValidationEnabled = false)
|
||||
{
|
||||
this.addressEndpoint = new Uri(serviceEndpoint + "/" + Paths.AddressPathSegment);
|
||||
this.protocol = protocol;
|
||||
|
@ -90,9 +91,7 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
GatewayAddressCache.ProtocolString(this.protocol));
|
||||
|
||||
this.openConnectionsHandler = openConnectionsHandler;
|
||||
this.isReplicaAddressValidationEnabled = Helpers.GetEnvironmentVariable<bool>(
|
||||
name: Constants.EnvironmentVariables.ReplicaConnectivityValidationEnabled,
|
||||
defaultValue: false);
|
||||
this.isReplicaAddressValidationEnabled = replicaAddressValidationEnabled;
|
||||
}
|
||||
|
||||
public Uri ServiceEndpoint => this.serviceEndpoint;
|
||||
|
@ -158,7 +157,8 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
collectionRid: collection.ResourceId,
|
||||
partitionKeyRangeIds: partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId),
|
||||
containerProperties: collection,
|
||||
shouldOpenRntbdChannels: shouldOpenRntbdChannels));
|
||||
shouldOpenRntbdChannels: shouldOpenRntbdChannels,
|
||||
cancellationToken: cancellationToken));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -348,12 +348,14 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
/// <param name="partitionKeyRangeIds">An instance of <see cref="IEnumerable{T}"/> containing the list of partition key range ids.</param>
|
||||
/// <param name="containerProperties">An instance of <see cref="ContainerProperties"/> containing the collection properties.</param>
|
||||
/// <param name="shouldOpenRntbdChannels">A boolean flag indicating whether Rntbd connections are required to be established to the backend replica nodes.</param>
|
||||
/// <param name="cancellationToken">An instance of <see cref="CancellationToken"/>.</param>
|
||||
private async Task WarmupCachesAndOpenConnectionsAsync(
|
||||
DocumentServiceRequest request,
|
||||
string collectionRid,
|
||||
IEnumerable<string> partitionKeyRangeIds,
|
||||
ContainerProperties containerProperties,
|
||||
bool shouldOpenRntbdChannels)
|
||||
bool shouldOpenRntbdChannels,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
TryCatch<DocumentServiceResponse> documentServiceResponseWrapper = await this.GetAddressesAsync(
|
||||
request: request,
|
||||
|
@ -381,6 +383,11 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
List<Task> openConnectionTasks = new ();
|
||||
foreach (Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> addressInfo in addressInfos)
|
||||
{
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
this.serverPartitionAddressCache.Set(
|
||||
new PartitionKeyRangeIdentity(containerProperties.ResourceId, addressInfo.Item1.PartitionKeyRangeId),
|
||||
addressInfo.Item2);
|
||||
|
@ -398,10 +405,7 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
}
|
||||
}
|
||||
|
||||
if (openConnectionTasks.Any())
|
||||
{
|
||||
await Task.WhenAll(openConnectionTasks);
|
||||
}
|
||||
await Task.WhenAll(openConnectionTasks);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
|
|
|
@ -27,7 +27,6 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDisposable
|
||||
{
|
||||
private const int MaxBackupReadRegions = 3;
|
||||
|
||||
private readonly GlobalEndpointManager endpointManager;
|
||||
private readonly GlobalPartitionEndpointManager partitionKeyRangeLocationCache;
|
||||
private readonly Protocol protocol;
|
||||
|
@ -39,6 +38,7 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
private readonly CosmosHttpClient httpClient;
|
||||
private readonly ConcurrentDictionary<Uri, EndpointCache> addressCacheByEndpoint;
|
||||
private readonly bool enableTcpConnectionEndpointRediscovery;
|
||||
private readonly bool isReplicaAddressValidationEnabled;
|
||||
private IOpenConnectionsHandler openConnectionsHandler;
|
||||
|
||||
public GlobalAddressResolver(
|
||||
|
@ -67,6 +67,8 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
|
||||
this.enableTcpConnectionEndpointRediscovery = connectionPolicy.EnableTcpConnectionEndpointRediscovery;
|
||||
|
||||
this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled();
|
||||
|
||||
this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover)
|
||||
|
||||
this.addressCacheByEndpoint = new ConcurrentDictionary<Uri, EndpointCache>();
|
||||
|
@ -281,7 +283,8 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
this.serviceConfigReader,
|
||||
this.httpClient,
|
||||
this.openConnectionsHandler,
|
||||
enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery);
|
||||
enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery,
|
||||
replicaAddressValidationEnabled: this.isReplicaAddressValidationEnabled);
|
||||
|
||||
string location = this.endpointManager.GetLocation(endpoint);
|
||||
AddressResolver addressResolver = new AddressResolver(null, new NullRequestSigner(), location);
|
||||
|
|
|
@ -7,7 +7,14 @@ namespace Microsoft.Azure.Cosmos
|
|||
using System;
|
||||
|
||||
internal static class ConfigurationManager
|
||||
{
|
||||
{
|
||||
/// <summary>
|
||||
/// A read-only string containing the environment variablename for enabling replica validation.
|
||||
/// This will eventually be removed oncereplica valdiatin is enabled by default for both preview
|
||||
/// and GA.
|
||||
/// </summary>
|
||||
internal static readonly string ReplicaConnectivityValidationEnabled = "AZURE_COSMOS_REPLICA_VALIDATION_ENABLED";
|
||||
|
||||
public static T GetEnvironmentVariable<T>(string variable, T defaultValue)
|
||||
{
|
||||
string value = Environment.GetEnvironmentVariable(variable);
|
||||
|
@ -17,5 +24,26 @@ namespace Microsoft.Azure.Cosmos
|
|||
}
|
||||
return (T)Convert.ChangeType(value, typeof(T));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the boolean value of the replica validation environment variable. Note that, replica validation
|
||||
/// is enabled by default for the preview package and disabled for GA at the moment. The user can set the
|
||||
/// respective environment variable 'AZURE_COSMOS_REPLICA_VALIDATION_ENABLED' to override the value for
|
||||
/// both preview and GA. The method will eventually be removed, once replica valdiatin is enabled by default
|
||||
/// for both preview and GA.
|
||||
/// </summary>
|
||||
/// <returns>A boolean flag indicating if replica validation is enabled.</returns>
|
||||
public static bool IsReplicaAddressValidationEnabled()
|
||||
{
|
||||
bool replicaValidationDefaultValue = false;
|
||||
#if PREVIEW
|
||||
replicaValidationDefaultValue = true;
|
||||
#endif
|
||||
|
||||
return ConfigurationManager
|
||||
.GetEnvironmentVariable(
|
||||
variable: ConfigurationManager.ReplicaConnectivityValidationEnabled,
|
||||
defaultValue: replicaValidationDefaultValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -209,7 +209,8 @@
|
|||
enableReadRequestsFallback: false,
|
||||
useMultipleWriteLocations: useMultipleWriteLocations,
|
||||
detectClientConnectivityIssues: true,
|
||||
disableRetryWithRetryPolicy: false);
|
||||
disableRetryWithRetryPolicy: false,
|
||||
enableReplicaValidation: false);
|
||||
|
||||
// Reducing retry timeout to avoid long-running tests
|
||||
replicatedResourceClient.GoneAndRetryWithRetryTimeoutInSecondsOverride = 1;
|
||||
|
|
|
@ -6,7 +6,6 @@ namespace Microsoft.Azure.Cosmos.Tests
|
|||
{
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Globalization;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
|
@ -14,44 +13,50 @@ namespace Microsoft.Azure.Cosmos.Tests
|
|||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using global::Azure.Core;
|
||||
using Microsoft.Azure.Cosmos.Fluent;
|
||||
using Microsoft.VisualStudio.TestTools.UnitTesting;
|
||||
using Moq;
|
||||
using Newtonsoft.Json.Linq;
|
||||
|
||||
[TestClass]
|
||||
public class CosmosBadReplicaTests
|
||||
{
|
||||
[TestMethod]
|
||||
[Timeout(30000)]
|
||||
public async Task TestGoneFromServiceScenarioAsync()
|
||||
[DataRow(true, DisplayName = "Validate when replica validation is enabled.")]
|
||||
[DataRow(false, DisplayName = "Validate when replica validation is disabled.")]
|
||||
public async Task TestGoneFromServiceScenarioAsync(
|
||||
bool enableReplicaValidation)
|
||||
{
|
||||
Mock<IHttpHandler> mockHttpHandler = new Mock<IHttpHandler>(MockBehavior.Strict);
|
||||
Uri endpoint = MockSetupsHelper.SetupSingleRegionAccount(
|
||||
"mockAccountInfo",
|
||||
consistencyLevel: ConsistencyLevel.Session,
|
||||
mockHttpHandler,
|
||||
out string primaryRegionEndpoint);
|
||||
try
|
||||
{
|
||||
Environment.SetEnvironmentVariable(
|
||||
variable: ConfigurationManager.ReplicaConnectivityValidationEnabled,
|
||||
value: enableReplicaValidation.ToString());
|
||||
|
||||
string databaseName = "mockDbName";
|
||||
string containerName = "mockContainerName";
|
||||
string containerRid = "ccZ1ANCszwk=";
|
||||
Documents.ResourceId cRid = Documents.ResourceId.Parse(containerRid);
|
||||
MockSetupsHelper.SetupContainerProperties(
|
||||
mockHttpHandler: mockHttpHandler,
|
||||
regionEndpoint: primaryRegionEndpoint,
|
||||
databaseName: databaseName,
|
||||
containerName: containerName,
|
||||
containerRid: containerRid);
|
||||
Mock<IHttpHandler> mockHttpHandler = new Mock<IHttpHandler>(MockBehavior.Strict);
|
||||
Uri endpoint = MockSetupsHelper.SetupSingleRegionAccount(
|
||||
"mockAccountInfo",
|
||||
consistencyLevel: ConsistencyLevel.Session,
|
||||
mockHttpHandler,
|
||||
out string primaryRegionEndpoint);
|
||||
|
||||
MockSetupsHelper.SetupSinglePartitionKeyRange(
|
||||
mockHttpHandler,
|
||||
primaryRegionEndpoint,
|
||||
cRid,
|
||||
out IReadOnlyList<string> partitionKeyRanges);
|
||||
string databaseName = "mockDbName";
|
||||
string containerName = "mockContainerName";
|
||||
string containerRid = "ccZ1ANCszwk=";
|
||||
Documents.ResourceId cRid = Documents.ResourceId.Parse(containerRid);
|
||||
MockSetupsHelper.SetupContainerProperties(
|
||||
mockHttpHandler: mockHttpHandler,
|
||||
regionEndpoint: primaryRegionEndpoint,
|
||||
databaseName: databaseName,
|
||||
containerName: containerName,
|
||||
containerRid: containerRid);
|
||||
|
||||
List<string> replicaIds1 = new List<string>()
|
||||
MockSetupsHelper.SetupSinglePartitionKeyRange(
|
||||
mockHttpHandler,
|
||||
primaryRegionEndpoint,
|
||||
cRid,
|
||||
out IReadOnlyList<string> partitionKeyRanges);
|
||||
|
||||
List<string> replicaIds1 = new List<string>()
|
||||
{
|
||||
"11111111111111111",
|
||||
"22222222222222222",
|
||||
|
@ -59,14 +64,14 @@ namespace Microsoft.Azure.Cosmos.Tests
|
|||
"44444444444444444",
|
||||
};
|
||||
|
||||
HttpResponseMessage replicaSet1 = MockSetupsHelper.CreateAddresses(
|
||||
replicaIds1,
|
||||
partitionKeyRanges.First(),
|
||||
"eastus",
|
||||
cRid);
|
||||
HttpResponseMessage replicaSet1 = MockSetupsHelper.CreateAddresses(
|
||||
replicaIds1,
|
||||
partitionKeyRanges.First(),
|
||||
"eastus",
|
||||
cRid);
|
||||
|
||||
// One replica changed on the refresh
|
||||
List<string> replicaIds2 = new List<string>()
|
||||
// One replica changed on the refresh
|
||||
List<string> replicaIds2 = new List<string>()
|
||||
{
|
||||
"11111111111111111",
|
||||
"22222222222222222",
|
||||
|
@ -74,118 +79,134 @@ namespace Microsoft.Azure.Cosmos.Tests
|
|||
"55555555555555555",
|
||||
};
|
||||
|
||||
HttpResponseMessage replicaSet2 = MockSetupsHelper.CreateAddresses(
|
||||
replicaIds2,
|
||||
partitionKeyRanges.First(),
|
||||
"eastus",
|
||||
cRid);
|
||||
HttpResponseMessage replicaSet2 = MockSetupsHelper.CreateAddresses(
|
||||
replicaIds2,
|
||||
partitionKeyRanges.First(),
|
||||
"eastus",
|
||||
cRid);
|
||||
|
||||
bool delayCacheRefresh = true;
|
||||
bool delayRefreshUnblocked = false;
|
||||
mockHttpHandler.SetupSequence(x => x.SendAsync(
|
||||
It.Is<HttpRequestMessage>(r => r.RequestUri.ToString().Contains("addresses")), It.IsAny<CancellationToken>()))
|
||||
.Returns(Task.FromResult(replicaSet1))
|
||||
.Returns(async ()=>
|
||||
{
|
||||
//block cache refresh to verify bad replica is not visited during refresh
|
||||
while (delayCacheRefresh)
|
||||
bool delayCacheRefresh = true;
|
||||
bool delayRefreshUnblocked = false;
|
||||
mockHttpHandler.SetupSequence(x => x.SendAsync(
|
||||
It.Is<HttpRequestMessage>(r => r.RequestUri.ToString().Contains("addresses")), It.IsAny<CancellationToken>()))
|
||||
.Returns(Task.FromResult(replicaSet1))
|
||||
.Returns(async ()=>
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(20));
|
||||
//block cache refresh to verify bad replica is not visited during refresh
|
||||
while (delayCacheRefresh)
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(20));
|
||||
}
|
||||
|
||||
delayRefreshUnblocked = true;
|
||||
return replicaSet2;
|
||||
});
|
||||
|
||||
int callBack = 0;
|
||||
List<Documents.TransportAddressUri> urisVisited = new List<Documents.TransportAddressUri>();
|
||||
Mock<Documents.TransportClient> mockTransportClient = new Mock<Documents.TransportClient>(MockBehavior.Strict);
|
||||
mockTransportClient.Setup(x => x.InvokeResourceOperationAsync(It.IsAny<Documents.TransportAddressUri>(), It.IsAny<Documents.DocumentServiceRequest>()))
|
||||
.Callback<Documents.TransportAddressUri, Documents.DocumentServiceRequest>((t, _) => urisVisited.Add(t))
|
||||
.Returns(() =>
|
||||
{
|
||||
callBack++;
|
||||
if (callBack == 1)
|
||||
{
|
||||
throw Documents.Rntbd.TransportExceptions.GetGoneException(
|
||||
new Uri("https://localhost:8081"),
|
||||
Guid.NewGuid(),
|
||||
new Documents.TransportException(Documents.TransportErrorCode.ConnectionBroken,
|
||||
null,
|
||||
Guid.NewGuid(),
|
||||
new Uri("https://localhost:8081"),
|
||||
"Mock",
|
||||
userPayload: true,
|
||||
payloadSent: false));
|
||||
}
|
||||
|
||||
delayRefreshUnblocked = true;
|
||||
return replicaSet2;
|
||||
return Task.FromResult(new Documents.StoreResponse()
|
||||
{
|
||||
Status = 200,
|
||||
Headers = new Documents.Collections.StoreResponseNameValueCollection()
|
||||
{
|
||||
ActivityId = Guid.NewGuid().ToString(),
|
||||
LSN = "12345",
|
||||
PartitionKeyRangeId = "0",
|
||||
GlobalCommittedLSN = "12345",
|
||||
SessionToken = "1#12345#1=12345"
|
||||
},
|
||||
ResponseBody = new MemoryStream()
|
||||
});
|
||||
});
|
||||
|
||||
int callBack = 0;
|
||||
List<Documents.TransportAddressUri> urisVisited = new List<Documents.TransportAddressUri>();
|
||||
Mock<Documents.TransportClient> mockTransportClient = new Mock<Documents.TransportClient>(MockBehavior.Strict);
|
||||
mockTransportClient.Setup(x => x.InvokeResourceOperationAsync(It.IsAny<Documents.TransportAddressUri>(), It.IsAny<Documents.DocumentServiceRequest>()))
|
||||
.Callback<Documents.TransportAddressUri, Documents.DocumentServiceRequest>((t, _) => urisVisited.Add(t))
|
||||
.Returns(() =>
|
||||
{
|
||||
callBack++;
|
||||
if (callBack == 1)
|
||||
CosmosClientOptions cosmosClientOptions = new CosmosClientOptions()
|
||||
{
|
||||
throw Documents.Rntbd.TransportExceptions.GetGoneException(
|
||||
new Uri("https://localhost:8081"),
|
||||
Guid.NewGuid(),
|
||||
new Documents.TransportException(Documents.TransportErrorCode.ConnectionBroken,
|
||||
null,
|
||||
Guid.NewGuid(),
|
||||
new Uri("https://localhost:8081"),
|
||||
"Mock",
|
||||
userPayload: true,
|
||||
payloadSent: false));
|
||||
}
|
||||
ConsistencyLevel = Cosmos.ConsistencyLevel.Session,
|
||||
HttpClientFactory = () => new HttpClient(new HttpHandlerHelper(mockHttpHandler.Object)),
|
||||
TransportClientHandlerFactory = (original) => mockTransportClient.Object,
|
||||
};
|
||||
|
||||
return Task.FromResult(new Documents.StoreResponse()
|
||||
using (CosmosClient customClient = new CosmosClient(
|
||||
endpoint.ToString(),
|
||||
Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())),
|
||||
cosmosClientOptions))
|
||||
{
|
||||
Status = 200,
|
||||
Headers = new Documents.Collections.StoreResponseNameValueCollection()
|
||||
try
|
||||
{
|
||||
ActivityId = Guid.NewGuid().ToString(),
|
||||
LSN = "12345",
|
||||
PartitionKeyRangeId = "0",
|
||||
GlobalCommittedLSN = "12345",
|
||||
SessionToken = "1#12345#1=12345"
|
||||
},
|
||||
ResponseBody = new MemoryStream()
|
||||
});
|
||||
});
|
||||
Container container = customClient.GetContainer(databaseName, containerName);
|
||||
|
||||
CosmosClientOptions cosmosClientOptions = new CosmosClientOptions()
|
||||
{
|
||||
ConsistencyLevel = Cosmos.ConsistencyLevel.Session,
|
||||
HttpClientFactory = () => new HttpClient(new HttpHandlerHelper(mockHttpHandler.Object)),
|
||||
TransportClientHandlerFactory = (original) => mockTransportClient.Object,
|
||||
};
|
||||
for (int i = 0; i < 20; i++)
|
||||
{
|
||||
ResponseMessage response = await container.ReadItemStreamAsync(Guid.NewGuid().ToString(), new Cosmos.PartitionKey(Guid.NewGuid().ToString()));
|
||||
Assert.AreEqual(HttpStatusCode.OK, response.StatusCode);
|
||||
}
|
||||
|
||||
using (CosmosClient customClient = new CosmosClient(
|
||||
endpoint.ToString(),
|
||||
Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())),
|
||||
cosmosClientOptions))
|
||||
{
|
||||
try
|
||||
{
|
||||
Container container = customClient.GetContainer(databaseName, containerName);
|
||||
mockTransportClient.VerifyAll();
|
||||
mockHttpHandler.VerifyAll();
|
||||
|
||||
for (int i = 0; i < 20; i++)
|
||||
{
|
||||
ResponseMessage response = await container.ReadItemStreamAsync(Guid.NewGuid().ToString(), new Cosmos.PartitionKey(Guid.NewGuid().ToString()));
|
||||
Assert.AreEqual(HttpStatusCode.OK, response.StatusCode);
|
||||
Documents.TransportAddressUri failedReplica = urisVisited.First();
|
||||
|
||||
// With replica validation enabled in preview mode, the failed replica will be validated as a part of the flow,
|
||||
// and because any subsequent validation/ connection will be successful, the failed replica will now be marked
|
||||
// as connected, thus it will be visited more than once. However, note that when replice validation is disabled,
|
||||
// no validation is done thus the URI will be marked as unhealthy as expected. Therefore the uri will be visited
|
||||
// just once.
|
||||
Assert.IsTrue(
|
||||
enableReplicaValidation
|
||||
? urisVisited.Any(x => x.Equals(failedReplica))
|
||||
: urisVisited.Count(x => x.Equals(failedReplica)) == 1);
|
||||
|
||||
urisVisited.Clear();
|
||||
delayCacheRefresh = false;
|
||||
do
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(100));
|
||||
}while (!delayRefreshUnblocked);
|
||||
|
||||
for (int i = 0; i < 20; i++)
|
||||
{
|
||||
ResponseMessage response = await container.ReadItemStreamAsync(Guid.NewGuid().ToString(), new Cosmos.PartitionKey(Guid.NewGuid().ToString()));
|
||||
Assert.AreEqual(HttpStatusCode.OK, response.StatusCode);
|
||||
}
|
||||
|
||||
Assert.AreEqual(4, urisVisited.ToHashSet().Count());
|
||||
|
||||
// Clears all the setups. No network calls should be done on the next operation.
|
||||
mockHttpHandler.Reset();
|
||||
mockTransportClient.Reset();
|
||||
}
|
||||
|
||||
mockTransportClient.VerifyAll();
|
||||
mockHttpHandler.VerifyAll();
|
||||
|
||||
Documents.TransportAddressUri failedReplica = urisVisited.First();
|
||||
Assert.AreEqual(1, urisVisited.Count(x => x.Equals(failedReplica)));
|
||||
|
||||
urisVisited.Clear();
|
||||
delayCacheRefresh = false;
|
||||
do
|
||||
finally
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(100));
|
||||
}while (!delayRefreshUnblocked);
|
||||
|
||||
for (int i = 0; i < 20; i++)
|
||||
{
|
||||
ResponseMessage response = await container.ReadItemStreamAsync(Guid.NewGuid().ToString(), new Cosmos.PartitionKey(Guid.NewGuid().ToString()));
|
||||
Assert.AreEqual(HttpStatusCode.OK, response.StatusCode);
|
||||
mockTransportClient.Setup(x => x.Dispose());
|
||||
}
|
||||
|
||||
Assert.AreEqual(4, urisVisited.ToHashSet().Count());
|
||||
|
||||
// Clears all the setups. No network calls should be done on the next operation.
|
||||
mockHttpHandler.Reset();
|
||||
mockTransportClient.Reset();
|
||||
}
|
||||
finally
|
||||
{
|
||||
mockTransportClient.Setup(x => x.Dispose());
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
Environment.SetEnvironmentVariable(
|
||||
variable: ConfigurationManager.ReplicaConnectivityValidationEnabled,
|
||||
value: null);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1000,19 +1000,8 @@ namespace Microsoft.Azure.Cosmos
|
|||
MockCosmosUtil.CreateCosmosHttpClient(() => httpClient),
|
||||
openConnectionsHandler: fakeOpenConnectionHandler,
|
||||
suboptimalPartitionForceRefreshIntervalInSeconds: 2,
|
||||
enableTcpConnectionEndpointRediscovery: true);
|
||||
|
||||
// By default, the replica validation feature is disabled in GatewayAddressCache. Reflection is used to enable the feature
|
||||
// for the purpose of this test.
|
||||
FieldInfo fieldInfo = cache
|
||||
.GetType()
|
||||
.GetField(
|
||||
name: "isReplicaAddressValidationEnabled",
|
||||
bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic);
|
||||
|
||||
fieldInfo.SetValue(
|
||||
obj: cache,
|
||||
value: true);
|
||||
enableTcpConnectionEndpointRediscovery: true,
|
||||
replicaAddressValidationEnabled: true);
|
||||
|
||||
DocumentServiceRequest request = DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid);
|
||||
|
||||
|
@ -1156,19 +1145,8 @@ namespace Microsoft.Azure.Cosmos
|
|||
MockCosmosUtil.CreateCosmosHttpClient(() => httpClient),
|
||||
openConnectionsHandler: fakeOpenConnectionHandler,
|
||||
suboptimalPartitionForceRefreshIntervalInSeconds: 2,
|
||||
enableTcpConnectionEndpointRediscovery: true);
|
||||
|
||||
// By default, the replica validation feature is disabled in GatewayAddressCache. Reflection is used to enable the feature
|
||||
// for the purpose of this test.
|
||||
FieldInfo fieldInfo = cache
|
||||
.GetType()
|
||||
.GetField(
|
||||
name: "isReplicaAddressValidationEnabled",
|
||||
bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic);
|
||||
|
||||
fieldInfo.SetValue(
|
||||
obj: cache,
|
||||
value: true);
|
||||
enableTcpConnectionEndpointRediscovery: true,
|
||||
replicaAddressValidationEnabled: true);
|
||||
|
||||
DocumentServiceRequest request = DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid);
|
||||
|
||||
|
|
|
@ -82,7 +82,7 @@ namespace Microsoft.Azure.Cosmos
|
|||
TransportClient mockTransportClient = this.GetMockTransportClient();
|
||||
ISessionContainer sessionContainer = new SessionContainer(string.Empty);
|
||||
|
||||
StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer);
|
||||
StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false);
|
||||
|
||||
Mock<IAuthorizationTokenProvider> mockAuthorizationTokenProvider = new Mock<IAuthorizationTokenProvider>();
|
||||
mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync(
|
||||
|
|
|
@ -536,7 +536,8 @@ namespace Microsoft.Azure.Cosmos
|
|||
new StoreReader(mockTransportClient,
|
||||
addressSelector,
|
||||
new AddressEnumerator(),
|
||||
sessionContainer);
|
||||
sessionContainer,
|
||||
enableReplicaValidation: false);
|
||||
|
||||
// reads always go to read quorum (2) replicas
|
||||
int replicaCountToRead = 2;
|
||||
|
@ -611,14 +612,14 @@ namespace Microsoft.Azure.Cosmos
|
|||
for (int i = 0; i < addressInformation.Length; i++)
|
||||
{
|
||||
TransportClient mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, false, false);
|
||||
StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer);
|
||||
ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false);
|
||||
StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false);
|
||||
ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false);
|
||||
StoreResponse response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result;
|
||||
Assert.AreEqual(100, response.LSN);
|
||||
|
||||
//globalCommittedLsn never catches up in this case
|
||||
mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, true, false, false);
|
||||
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false);
|
||||
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false);
|
||||
try
|
||||
{
|
||||
response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result;
|
||||
|
@ -629,17 +630,17 @@ namespace Microsoft.Azure.Cosmos
|
|||
}
|
||||
|
||||
mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, true, false);
|
||||
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false);
|
||||
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false);
|
||||
response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result;
|
||||
Assert.AreEqual(100, response.LSN);
|
||||
|
||||
mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, true, true);
|
||||
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false);
|
||||
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false);
|
||||
response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result;
|
||||
Assert.AreEqual(100, response.LSN);
|
||||
|
||||
mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, false, true);
|
||||
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false);
|
||||
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false);
|
||||
response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result;
|
||||
Assert.AreEqual(100, response.LSN);
|
||||
}
|
||||
|
@ -703,7 +704,8 @@ namespace Microsoft.Azure.Cosmos
|
|||
new StoreReader(mockTransportClient,
|
||||
addressSelector,
|
||||
new AddressEnumerator(),
|
||||
sessionContainer);
|
||||
sessionContainer,
|
||||
false);
|
||||
|
||||
Mock<IAuthorizationTokenProvider> mockAuthorizationTokenProvider = new Mock<IAuthorizationTokenProvider>();
|
||||
mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync(
|
||||
|
@ -746,7 +748,8 @@ namespace Microsoft.Azure.Cosmos
|
|||
new StoreReader(mockTransportClient,
|
||||
addressSelector,
|
||||
new AddressEnumerator(),
|
||||
sessionContainer);
|
||||
sessionContainer,
|
||||
false);
|
||||
|
||||
Mock<IAuthorizationTokenProvider> mockAuthorizationTokenProvider = new Mock<IAuthorizationTokenProvider>();
|
||||
mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync(
|
||||
|
@ -798,7 +801,8 @@ namespace Microsoft.Azure.Cosmos
|
|||
new StoreReader(mockTransportClient,
|
||||
addressSelector,
|
||||
new AddressEnumerator(),
|
||||
sessionContainer);
|
||||
sessionContainer,
|
||||
false);
|
||||
|
||||
Mock<IAuthorizationTokenProvider> mockAuthorizationTokenProvider = new Mock<IAuthorizationTokenProvider>();
|
||||
mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync(
|
||||
|
|
Загрузка…
Ссылка в новой задаче