Routing: Adds Parallel Request Hedging (#4198)
* initial commit * fix * document client restore * document client changes * clientContextCore fix * global endpoint manager fix * pre test changes * start of tests * added dispose for cancellation token source * test changes * working test * more testing * removed unneeded changes * revert changes to global endpoint manager (unneeded) * requested changes * requested changes * moves logic into availability strategy * adds disableStrategy type * fixed test * refactor should hedge * refactor should hedge/adds can use availability strat * fixed mocking test * Update Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs Co-authored-by: Matias Quaranta <ealsur@users.noreply.github.com> * requested changes * fix enabled * added preview flag * fixed XML * fixed preview flags * fixed bugs * nit * changed preview flags + update contracts * revert file for whitespace * changed file back after update contract * removed using * requested changes * fixed small bug * fixed bug * removed options from client builder * removed usings * constructor check * fixed test * added exclude region to logs * lazy task create * rename + lazy * improvements and fixes * Added XML commentes * fixed xml comments * small tweeks * added fixes + tests * added item check to tests * changed test regions to match with CI accounts * query test * update test for multiregion CI pipelines * enviroment var null check * null checks * perf tests * revert benchmark project * possible memory saving * Tests/improvements * extensive testing improvements + bug fixes * removed unneeded changes * nits * fix hedge regions * update contracts * fix updatecontract * test fix * fixed areequal asserts * ALTERNATE METHOD * added readfeed FI operation type to tests * requested changes and improvemtns * list optimization * fixed edge case diagnostics * small fix * small fixes * refactor code * fixed null issues * null refrence * bug fixes * changed header clone to internal * fixed API doc + test change * removed unused method * changed to internal * fixed internal * removed contract changes * updated abstract class * suggested changes * small bug fixes/improvements * nits and fixes * removed unused method * test fix + applicable region fix + error handeling * fixed test * fixed test * location cache change * requested changes * fixed test * nits * headers change * Update Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs Co-authored-by: Matias Quaranta <ealsur@users.noreply.github.com> * simplifed method * nit * fixed change * remove linq * fixed faulty change * reverted accidental test change * removed tooManyRequests test - unlrelated bug * removed tooManyRequests test - unlrelated bug --------- Co-authored-by: Matias Quaranta <ealsur@users.noreply.github.com>
This commit is contained in:
Родитель
b478595f17
Коммит
3fd2ce6fe8
|
@ -13,7 +13,7 @@ namespace Microsoft.Azure.Cosmos
|
|||
using System.Net.Http;
|
||||
using System.Net.Security;
|
||||
using System.Security.Cryptography.X509Certificates;
|
||||
using Microsoft.Azure.Cosmos.Fluent;
|
||||
using Microsoft.Azure.Cosmos.Fluent;
|
||||
using Microsoft.Azure.Documents;
|
||||
using Microsoft.Azure.Documents.Client;
|
||||
using Newtonsoft.Json;
|
||||
|
@ -650,7 +650,12 @@ namespace Microsoft.Azure.Cosmos
|
|||
|
||||
this.httpClientFactory = value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Availability Strategy to be used for periods of high latency
|
||||
/// </summary>
|
||||
internal AvailabilityStrategy AvailabilityStrategy { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Enable partition key level failover
|
||||
|
|
|
@ -114,6 +114,7 @@ namespace Microsoft.Azure.Cosmos
|
|||
|
||||
private readonly bool IsLocalQuorumConsistency = false;
|
||||
private readonly bool isReplicaAddressValidationEnabled;
|
||||
private readonly AvailabilityStrategy availabilityStrategy;
|
||||
|
||||
//Fault Injection
|
||||
private readonly IChaosInterceptorFactory chaosInterceptorFactory;
|
||||
|
@ -439,6 +440,7 @@ namespace Microsoft.Azure.Cosmos
|
|||
/// <param name="cosmosClientId"></param>
|
||||
/// <param name="remoteCertificateValidationCallback">This delegate responsible for validating the third party certificate. </param>
|
||||
/// <param name="cosmosClientTelemetryOptions">This is distributed tracing flag</param>
|
||||
/// <param name="availabilityStrategy">This is the availability strategy for the client</param>"
|
||||
/// <param name="chaosInterceptorFactory">This is the chaos interceptor used for fault injection</param>
|
||||
/// <remarks>
|
||||
/// The service endpoint can be obtained from the Azure Management Portal.
|
||||
|
@ -468,6 +470,7 @@ namespace Microsoft.Azure.Cosmos
|
|||
string cosmosClientId = null,
|
||||
RemoteCertificateValidationCallback remoteCertificateValidationCallback = null,
|
||||
CosmosClientTelemetryOptions cosmosClientTelemetryOptions = null,
|
||||
AvailabilityStrategy availabilityStrategy = null,
|
||||
IChaosInterceptorFactory chaosInterceptorFactory = null)
|
||||
{
|
||||
if (sendingRequestEventArgs != null)
|
||||
|
@ -491,6 +494,7 @@ namespace Microsoft.Azure.Cosmos
|
|||
this.transportClientHandlerFactory = transportClientHandlerFactory;
|
||||
this.IsLocalQuorumConsistency = isLocalQuorumConsistency;
|
||||
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(cancellationToken: this.cancellationTokenSource.Token);
|
||||
this.availabilityStrategy = availabilityStrategy;
|
||||
this.chaosInterceptorFactory = chaosInterceptorFactory;
|
||||
this.chaosInterceptor = chaosInterceptorFactory?.CreateInterceptor(this);
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ namespace Microsoft.Azure.Cosmos.Fluent
|
|||
using System.Threading.Tasks;
|
||||
using global::Azure;
|
||||
using global::Azure.Core;
|
||||
using Microsoft.Azure.Cosmos.Core.Trace;
|
||||
using Microsoft.Azure.Cosmos.Core.Trace;
|
||||
using Microsoft.Azure.Documents;
|
||||
using Microsoft.Azure.Documents.Client;
|
||||
|
||||
|
@ -683,6 +683,17 @@ namespace Microsoft.Azure.Cosmos.Fluent
|
|||
{
|
||||
this.clientOptions.ApiType = apiType;
|
||||
return this;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Availability Stragey to be used for periods of high latency
|
||||
/// </summary>
|
||||
/// <param name="strategy"></param>
|
||||
/// <returns>The CosmosClientBuilder</returns>
|
||||
internal CosmosClientBuilder WithAvailibilityStrategy(AvailabilityStrategy strategy)
|
||||
{
|
||||
this.clientOptions.AvailabilityStrategy = strategy;
|
||||
return this;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
|
|
@ -12,6 +12,7 @@ namespace Microsoft.Azure.Cosmos.Handlers
|
|||
using System.Net.Http;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Cosmos.Diagnostics;
|
||||
using Microsoft.Azure.Cosmos.Routing;
|
||||
using Microsoft.Azure.Cosmos.Tracing;
|
||||
using Microsoft.Azure.Documents;
|
||||
|
@ -38,6 +39,7 @@ namespace Microsoft.Azure.Cosmos.Handlers
|
|||
Cosmos.PriorityLevel? requestedClientPriorityLevel)
|
||||
{
|
||||
this.client = client;
|
||||
|
||||
this.RequestedClientConsistencyLevel = requestedClientConsistencyLevel;
|
||||
this.RequestedClientPriorityLevel = requestedClientPriorityLevel;
|
||||
}
|
||||
|
@ -52,11 +54,8 @@ namespace Microsoft.Azure.Cosmos.Handlers
|
|||
}
|
||||
|
||||
RequestOptions promotedRequestOptions = request.RequestOptions;
|
||||
if (promotedRequestOptions != null)
|
||||
{
|
||||
// Fill request options
|
||||
promotedRequestOptions.PopulateRequestOptions(request);
|
||||
}
|
||||
// Fill request options
|
||||
promotedRequestOptions?.PopulateRequestOptions(request);
|
||||
|
||||
// Adds the NoContent header if not already added based on Client Level flag
|
||||
if (RequestInvokerHandler.ShouldSetNoContentResponseHeaders(
|
||||
|
@ -79,7 +78,44 @@ namespace Microsoft.Azure.Cosmos.Handlers
|
|||
|
||||
await request.AssertPartitioningDetailsAsync(this.client, cancellationToken, request.Trace);
|
||||
this.FillMultiMasterContext(request);
|
||||
return await base.SendAsync(request, cancellationToken);
|
||||
|
||||
AvailabilityStrategy strategy = this.AvailabilityStrategy(request);
|
||||
|
||||
if (strategy != null && strategy.Enabled())
|
||||
{
|
||||
return await strategy.ExecuteAvailabilityStrategyAsync(
|
||||
this.BaseSendAsync,
|
||||
this.client,
|
||||
request,
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
return await this.BaseSendAsync(request, cancellationToken);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// This method determines if there is an availability strategy that the request can use.
|
||||
/// Note that the request level availability strategy options override the client level options.
|
||||
/// </summary>
|
||||
/// <param name="request"></param>
|
||||
/// <returns>whether the request should be a parallel hedging request.</returns>
|
||||
public AvailabilityStrategy AvailabilityStrategy(RequestMessage request)
|
||||
{
|
||||
return request.RequestOptions?.AvailabilityStrategy
|
||||
?? this.client.ClientOptions.AvailabilityStrategy;
|
||||
}
|
||||
|
||||
public virtual async Task<ResponseMessage> BaseSendAsync(
|
||||
RequestMessage request,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ResponseMessage response = await base.SendAsync(request, cancellationToken);
|
||||
if (request.RequestOptions?.ExcludeRegions != null)
|
||||
{
|
||||
((CosmosTraceDiagnostics)response.Diagnostics).Value.AddOrUpdateDatum("ExcludedRegions", request.RequestOptions.ExcludeRegions);
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
public virtual async Task<T> SendAsync<T>(
|
||||
|
|
|
@ -5,10 +5,10 @@
|
|||
namespace Microsoft.Azure.Cosmos
|
||||
{
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Globalization;
|
||||
using System.IO;
|
||||
using System.IO;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Threading;
|
||||
|
@ -62,6 +62,28 @@ namespace Microsoft.Azure.Cosmos
|
|||
this.Method = method;
|
||||
this.RequestUriString = requestUriString;
|
||||
this.Trace = trace ?? throw new ArgumentNullException(nameof(trace));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create a <see cref="RequestMessage"/>, used for Clone() method.
|
||||
/// </summary>
|
||||
/// <param name="method">The http method</param>
|
||||
/// <param name="requestUriString">The requested URI</param>
|
||||
/// <param name="trace">The trace node to append traces to.</param>
|
||||
/// <param name="headers">The headers to use.</param>
|
||||
/// <param name="properties">The properties to use.</param>
|
||||
private RequestMessage(
|
||||
HttpMethod method,
|
||||
string requestUriString,
|
||||
ITrace trace,
|
||||
Headers headers,
|
||||
Dictionary<string, object> properties)
|
||||
{
|
||||
this.Method = method;
|
||||
this.RequestUriString = requestUriString;
|
||||
this.Trace = trace ?? throw new ArgumentNullException(nameof(trace));
|
||||
this.headers = new Lazy<Headers>(() => headers);
|
||||
this.properties = new Lazy<Dictionary<string, object>>(() => properties);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -286,6 +308,47 @@ namespace Microsoft.Azure.Cosmos
|
|||
this.DocumentServiceRequest.RequestContext.ExcludeRegions = this.RequestOptions?.ExcludeRegions;
|
||||
this.OnBeforeRequestHandler(this.DocumentServiceRequest);
|
||||
return this.DocumentServiceRequest;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Clone the request message
|
||||
/// </summary>
|
||||
/// <returns>a cloned copy of the RequestMessage</returns>
|
||||
internal RequestMessage Clone(ITrace newTrace, CloneableStream cloneContent)
|
||||
{
|
||||
RequestMessage clone = new RequestMessage(
|
||||
this.Method,
|
||||
this.RequestUriString,
|
||||
newTrace,
|
||||
this.Headers.Clone(),
|
||||
new Dictionary<string, object>(this.Properties));
|
||||
|
||||
if (this.Content != null && cloneContent != null)
|
||||
{
|
||||
clone.Content = cloneContent.Clone();
|
||||
}
|
||||
|
||||
if (this.RequestOptions != null)
|
||||
{
|
||||
clone.RequestOptions = this.RequestOptions.ShallowCopy();
|
||||
}
|
||||
|
||||
clone.ResourceType = this.ResourceType;
|
||||
|
||||
clone.OperationType = this.OperationType;
|
||||
|
||||
if (this.PartitionKeyRangeId != null)
|
||||
{
|
||||
clone.PartitionKeyRangeId = string.IsNullOrEmpty(this.PartitionKeyRangeId.CollectionRid)
|
||||
? new PartitionKeyRangeIdentity(this.PartitionKeyRangeId.PartitionKeyRangeId)
|
||||
: new PartitionKeyRangeIdentity(this.PartitionKeyRangeId.CollectionRid, this.PartitionKeyRangeId.PartitionKeyRangeId);
|
||||
}
|
||||
|
||||
clone.UseGatewayMode = this.UseGatewayMode;
|
||||
clone.ContainerId = this.ContainerId;
|
||||
clone.DatabaseId = this.DatabaseId;
|
||||
|
||||
return clone;
|
||||
}
|
||||
|
||||
private static Dictionary<string, object> CreateDictionary()
|
||||
|
|
|
@ -6,7 +6,7 @@ namespace Microsoft.Azure.Cosmos
|
|||
{
|
||||
using System;
|
||||
using System.Collections;
|
||||
using System.Collections.Generic;
|
||||
using System.Collections.Generic;
|
||||
using System.Globalization;
|
||||
using Microsoft.Azure.Documents;
|
||||
using Microsoft.Azure.Documents.Collections;
|
||||
|
@ -285,6 +285,11 @@ namespace Microsoft.Azure.Cosmos
|
|||
HttpResponseHeadersWrapper httpResponseHeaders => httpResponseHeaders,
|
||||
_ => new NameValueResponseHeaders(nameValueCollection),
|
||||
};
|
||||
}
|
||||
|
||||
internal Headers(CosmosMessageHeadersInternal cosmosMessageHeaders)
|
||||
{
|
||||
this.CosmosMessageHeaders = cosmosMessageHeaders;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -404,6 +409,21 @@ namespace Microsoft.Azure.Cosmos
|
|||
{
|
||||
return this.CosmosMessageHeaders.GetHeaderValue<T>(headerName);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Clones the current <see cref="Headers"/>.
|
||||
/// </summary>
|
||||
/// <returns>a cloned copy of the current <see cref="Headers"/></returns>
|
||||
internal Headers Clone()
|
||||
{
|
||||
Headers clone = new Headers();
|
||||
foreach (string key in this.CosmosMessageHeaders.AllKeys())
|
||||
{
|
||||
clone.Add(key, this.CosmosMessageHeaders.Get(key));
|
||||
}
|
||||
|
||||
return clone;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Enumerates all the HTTP headers names in the <see cref="Headers"/>.
|
||||
|
|
|
@ -5,9 +5,8 @@
|
|||
namespace Microsoft.Azure.Cosmos
|
||||
{
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Collections.Generic;
|
||||
using Microsoft.Azure.Documents;
|
||||
using Telemetry;
|
||||
|
||||
/// <summary>
|
||||
/// The default cosmos request options
|
||||
|
@ -69,7 +68,15 @@ namespace Microsoft.Azure.Cosmos
|
|||
/// This can be used to route a request to a specific region by excluding all other regions.
|
||||
/// If all regions are excluded, then the request will be routed to the primary/hub region.
|
||||
/// </summary>
|
||||
public List<string> ExcludeRegions { get; set; }
|
||||
public List<string> ExcludeRegions { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Cosmos availability strategy.
|
||||
/// Availability strategy allows the SDK to send out additional cross region requests to help
|
||||
/// reduce latency and increase availability. Currently there is one type of availability strategy, parallel request hedging.
|
||||
/// If there is a globally enabled availability strategy, setting one in the request options will override the global one.
|
||||
/// </summary>
|
||||
internal AvailabilityStrategy AvailabilityStrategy { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the boolean to use effective partition key routing in the cosmos db request.
|
||||
|
|
|
@ -83,6 +83,7 @@ namespace Microsoft.Azure.Cosmos
|
|||
cosmosClientId: cosmosClient.Id,
|
||||
remoteCertificateValidationCallback: ClientContextCore.SslCustomValidationCallBack(clientOptions.GetServerCertificateCustomValidationCallback()),
|
||||
cosmosClientTelemetryOptions: clientOptions.CosmosClientTelemetryOptions,
|
||||
availabilityStrategy: clientOptions.AvailabilityStrategy,
|
||||
chaosInterceptorFactory: clientOptions.ChaosInterceptorFactory);
|
||||
|
||||
return ClientContextCore.Create(
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
//------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
//------------------------------------------------------------
|
||||
namespace Microsoft.Azure.Cosmos
|
||||
{
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Cosmos.Handlers;
|
||||
|
||||
/// <summary>
|
||||
/// Types of availability strategies supported
|
||||
/// </summary>
|
||||
internal abstract class AvailabilityStrategy
|
||||
{
|
||||
/// <summary>
|
||||
/// Execute the availability strategy
|
||||
/// </summary>
|
||||
/// <param name="sender"></param>
|
||||
/// <param name="client"></param>
|
||||
/// <param name="requestMessage"></param>
|
||||
/// <param name="cancellationToken"></param>
|
||||
/// <returns>The response from the service after the availability strategy is executed</returns>
|
||||
public abstract Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
|
||||
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
|
||||
CosmosClient client,
|
||||
RequestMessage requestMessage,
|
||||
CancellationToken cancellationToken);
|
||||
|
||||
internal abstract bool Enabled();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,323 @@
|
|||
//------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
//------------------------------------------------------------
|
||||
namespace Microsoft.Azure.Cosmos
|
||||
{
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Cosmos.Core.Trace;
|
||||
using Microsoft.Azure.Cosmos.Diagnostics;
|
||||
using Microsoft.Azure.Documents;
|
||||
|
||||
/// <summary>
|
||||
/// Parallel hedging availability strategy. Once threshold time is reached,
|
||||
/// the SDK will send out an additional request to a remote region in parallel
|
||||
/// if the first parallel request or the original has not returned after the step time,
|
||||
/// additional parallel requests will be sent out there is a response or all regions are exausted.
|
||||
/// </summary>
|
||||
internal class CrossRegionParallelHedgingAvailabilityStrategy : AvailabilityStrategy
|
||||
{
|
||||
private const string HedgeRegions = "Hedge Regions";
|
||||
private const string HedgeContext = "Hedge Context";
|
||||
private const string HedgeContextOriginalRequest = "Original Request";
|
||||
private const string HedgeContextHedgedRequest = "Hedged Request";
|
||||
|
||||
/// <summary>
|
||||
/// Latency threshold which activates the first region hedging
|
||||
/// </summary>
|
||||
public TimeSpan Threshold { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// When the SDK will send out additional hedging requests after the initial hedging request
|
||||
/// </summary>
|
||||
public TimeSpan ThresholdStep { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// Constructor for parallel hedging availability strategy
|
||||
/// </summary>
|
||||
/// <param name="threshold"></param>
|
||||
/// <param name="thresholdStep"></param>
|
||||
public CrossRegionParallelHedgingAvailabilityStrategy(
|
||||
TimeSpan threshold,
|
||||
TimeSpan? thresholdStep)
|
||||
{
|
||||
if (threshold <= TimeSpan.Zero)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(nameof(threshold));
|
||||
}
|
||||
|
||||
if (thresholdStep <= TimeSpan.Zero)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(nameof(thresholdStep));
|
||||
}
|
||||
|
||||
this.Threshold = threshold;
|
||||
this.ThresholdStep = thresholdStep ?? TimeSpan.FromMilliseconds(-1);
|
||||
}
|
||||
|
||||
internal override bool Enabled()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// This method determines if the request should be sent with a parallel hedging availability strategy.
|
||||
/// This availability strategy can only be used if the request is a read-only request on a document request.
|
||||
/// </summary>
|
||||
/// <param name="request"></param>
|
||||
/// <returns>whether the request should be a parallel hedging request.</returns>
|
||||
internal bool ShouldHedge(RequestMessage request)
|
||||
{
|
||||
//Only use availability strategy for document point operations
|
||||
if (request.ResourceType != ResourceType.Document)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
//check to see if it is a not a read-only request
|
||||
if (!OperationTypeExtensions.IsReadOperation(request.OperationType))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Execute the parallel hedging availability strategy
|
||||
/// </summary>
|
||||
/// <param name="sender"></param>
|
||||
/// <param name="client"></param>
|
||||
/// <param name="request"></param>
|
||||
/// <param name="cancellationToken"></param>
|
||||
/// <returns>The response after executing cross region hedging</returns>
|
||||
public override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
|
||||
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
|
||||
CosmosClient client,
|
||||
RequestMessage request,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (!this.ShouldHedge(request))
|
||||
{
|
||||
return await sender(request, cancellationToken);
|
||||
}
|
||||
|
||||
using (CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
|
||||
{
|
||||
using (CloneableStream clonedBody = (CloneableStream)(request.Content == null
|
||||
? null//new CloneableStream(new MemoryStream())
|
||||
: await StreamExtension.AsClonableStreamAsync(request.Content)))
|
||||
{
|
||||
IReadOnlyCollection<string> hedgeRegions = client.DocumentClient.GlobalEndpointManager
|
||||
.GetApplicableRegions(
|
||||
request.RequestOptions?.ExcludeRegions,
|
||||
OperationTypeExtensions.IsReadOperation(request.OperationType));
|
||||
|
||||
List<Task> requestTasks = new List<Task>(hedgeRegions.Count + 1);
|
||||
|
||||
Task<(bool, ResponseMessage)> primaryRequest = null;
|
||||
|
||||
ResponseMessage responseMessage = null;
|
||||
|
||||
//Send out hedged requests
|
||||
for (int requestNumber = 0; requestNumber < hedgeRegions.Count; requestNumber++)
|
||||
{
|
||||
TimeSpan awaitTime = requestNumber == 0 ? this.Threshold : this.ThresholdStep;
|
||||
|
||||
using (CancellationTokenSource timerTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
|
||||
{
|
||||
CancellationToken timerToken = timerTokenSource.Token;
|
||||
using (Task hedgeTimer = Task.Delay(awaitTime, timerToken))
|
||||
{
|
||||
if (requestNumber == 0)
|
||||
{
|
||||
primaryRequest = this.RequestSenderAndResultCheckAsync(
|
||||
sender,
|
||||
request,
|
||||
cancellationToken,
|
||||
cancellationTokenSource);
|
||||
|
||||
requestTasks.Add(primaryRequest);
|
||||
}
|
||||
else
|
||||
{
|
||||
Task<(bool, ResponseMessage)> requestTask = this.CloneAndSendAsync(
|
||||
sender: sender,
|
||||
request: request,
|
||||
clonedBody: clonedBody,
|
||||
hedgeRegions: hedgeRegions,
|
||||
requestNumber: requestNumber,
|
||||
cancellationToken: cancellationToken,
|
||||
cancellationTokenSource: cancellationTokenSource);
|
||||
|
||||
requestTasks.Add(requestTask);
|
||||
}
|
||||
|
||||
requestTasks.Add(hedgeTimer);
|
||||
|
||||
Task completedTask = await Task.WhenAny(requestTasks);
|
||||
requestTasks.Remove(completedTask);
|
||||
|
||||
if (completedTask == hedgeTimer)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
timerTokenSource.Cancel();
|
||||
requestTasks.Remove(hedgeTimer);
|
||||
|
||||
if (completedTask.IsFaulted)
|
||||
{
|
||||
AggregateException innerExceptions = completedTask.Exception.Flatten();
|
||||
}
|
||||
|
||||
(bool isNonTransient, responseMessage) = await (Task<(bool, ResponseMessage)>)completedTask;
|
||||
if (isNonTransient)
|
||||
{
|
||||
cancellationTokenSource.Cancel();
|
||||
((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum(
|
||||
HedgeRegions,
|
||||
HedgeRegionsToString(responseMessage.Diagnostics.GetContactedRegions()));
|
||||
((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum(
|
||||
HedgeContext,
|
||||
object.ReferenceEquals(primaryRequest, completedTask)
|
||||
? HedgeContextOriginalRequest
|
||||
: HedgeContextHedgedRequest);
|
||||
return responseMessage;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//Wait for a good response from the hedged requests/primary request
|
||||
Exception lastException = null;
|
||||
while (requestTasks.Any())
|
||||
{
|
||||
Task completedTask = await Task.WhenAny(requestTasks);
|
||||
requestTasks.Remove(completedTask);
|
||||
if (completedTask.IsFaulted)
|
||||
{
|
||||
AggregateException innerExceptions = completedTask.Exception.Flatten();
|
||||
lastException = innerExceptions.InnerExceptions.FirstOrDefault();
|
||||
}
|
||||
|
||||
(bool isNonTransient, responseMessage) = await (Task<(bool, ResponseMessage)>)completedTask;
|
||||
if (isNonTransient || requestTasks.Count == 0)
|
||||
{
|
||||
cancellationTokenSource.Cancel();
|
||||
((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum(
|
||||
HedgeRegions,
|
||||
HedgeRegionsToString(responseMessage.Diagnostics.GetContactedRegions()));
|
||||
((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum(
|
||||
HedgeContext,
|
||||
object.ReferenceEquals(primaryRequest, completedTask)
|
||||
? HedgeContextOriginalRequest
|
||||
: HedgeContextHedgedRequest);
|
||||
return responseMessage;
|
||||
}
|
||||
}
|
||||
|
||||
if (lastException != null)
|
||||
{
|
||||
throw lastException;
|
||||
}
|
||||
|
||||
Debug.Assert(responseMessage != null);
|
||||
return responseMessage;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<(bool, ResponseMessage)> CloneAndSendAsync(
|
||||
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
|
||||
RequestMessage request,
|
||||
CloneableStream clonedBody,
|
||||
IReadOnlyCollection<string> hedgeRegions,
|
||||
int requestNumber,
|
||||
CancellationToken cancellationToken,
|
||||
CancellationTokenSource cancellationTokenSource)
|
||||
{
|
||||
RequestMessage clonedRequest;
|
||||
using (clonedRequest = request.Clone(request.Trace.Parent, clonedBody))
|
||||
{
|
||||
clonedRequest.RequestOptions ??= new RequestOptions();
|
||||
|
||||
List<string> excludeRegions = new List<string>(hedgeRegions);
|
||||
excludeRegions.RemoveAt(requestNumber);
|
||||
clonedRequest.RequestOptions.ExcludeRegions = excludeRegions;
|
||||
|
||||
return await this.RequestSenderAndResultCheckAsync(
|
||||
sender,
|
||||
clonedRequest,
|
||||
cancellationToken,
|
||||
cancellationTokenSource);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<(bool, ResponseMessage)> RequestSenderAndResultCheckAsync(
|
||||
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
|
||||
RequestMessage request,
|
||||
CancellationToken cancellationToken,
|
||||
CancellationTokenSource cancellationTokenSource)
|
||||
{
|
||||
try
|
||||
{
|
||||
ResponseMessage response = await sender.Invoke(request, cancellationToken);
|
||||
if (IsFinalResult((int)response.StatusCode, (int)response.Headers.SubStatusCode))
|
||||
{
|
||||
if (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
cancellationTokenSource.Cancel();
|
||||
}
|
||||
return (true, response);
|
||||
}
|
||||
|
||||
return (false, response);
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationTokenSource.IsCancellationRequested)
|
||||
{
|
||||
return (false, null);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
DefaultTrace.TraceError("Exception thrown while executing cross region hedging availability strategy: {0}", ex);
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
private static bool IsFinalResult(int statusCode, int subStatusCode)
|
||||
{
|
||||
//All 1xx, 2xx, and 3xx status codes should be treated as final results
|
||||
if (statusCode < (int)HttpStatusCode.BadRequest)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
//Status codes that indicate non-transient timeouts
|
||||
if (statusCode == (int)HttpStatusCode.BadRequest
|
||||
|| statusCode == (int)HttpStatusCode.Conflict
|
||||
|| statusCode == (int)HttpStatusCode.MethodNotAllowed
|
||||
|| statusCode == (int)HttpStatusCode.PreconditionFailed
|
||||
|| statusCode == (int)HttpStatusCode.RequestEntityTooLarge
|
||||
|| statusCode == (int)HttpStatusCode.Unauthorized)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
//404 - Not found is a final result as the document was not yet available
|
||||
//after enforcing the consistency model
|
||||
//All other errors should be treated as possibly transient errors
|
||||
return statusCode == (int)HttpStatusCode.NotFound && subStatusCode == (int)SubStatusCodes.Unknown;
|
||||
}
|
||||
|
||||
private static string HedgeRegionsToString(IReadOnlyList<(string, Uri)> hedgeRegions)
|
||||
{
|
||||
return string.Join(",", hedgeRegions);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
//------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
//------------------------------------------------------------
|
||||
namespace Microsoft.Azure.Cosmos
|
||||
{
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
/// <summary>
|
||||
/// A Disabled availability strategy that does not do anything. Used for overriding the default global availability strategy.
|
||||
/// </summary>
|
||||
internal class DisabledAvailabilityStrategy : AvailabilityStrategy
|
||||
{
|
||||
internal override bool Enabled()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// This method is not implemented and will throw an exception if called.
|
||||
/// </summary>
|
||||
/// <param name="sender"></param>
|
||||
/// <param name="client"></param>
|
||||
/// <param name="requestMessage"></param>
|
||||
/// <param name="cancellationToken"></param>
|
||||
/// <returns>nothing, this will throw.</returns>
|
||||
public override Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
|
||||
Func<RequestMessage,
|
||||
CancellationToken,
|
||||
Task<ResponseMessage>> sender,
|
||||
CosmosClient client,
|
||||
RequestMessage requestMessage,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -104,8 +104,8 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
|
||||
public Uri GetHubUri()
|
||||
{
|
||||
return this.locationCache.GetHubUri();
|
||||
}
|
||||
return this.locationCache.GetHubUri();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// This will get the account information.
|
||||
|
@ -447,6 +447,11 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
{
|
||||
return this.locationCache.GetApplicableEndpoints(request, isReadRequest);
|
||||
}
|
||||
|
||||
public ReadOnlyCollection<string> GetApplicableRegions(IEnumerable<string> excludeRegions, bool isReadRequest)
|
||||
{
|
||||
return this.locationCache.GetApplicableRegions(excludeRegions, isReadRequest);
|
||||
}
|
||||
|
||||
public bool TryGetLocationForGatewayDiagnostics(Uri endpoint, out string regionName)
|
||||
{
|
||||
|
|
|
@ -257,6 +257,11 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
string writeLocation = currentLocationInfo.AvailableWriteLocations[0];
|
||||
Uri locationEndpointToRoute = currentLocationInfo.AvailableWriteEndpointByLocation[writeLocation];
|
||||
return locationEndpointToRoute;
|
||||
}
|
||||
|
||||
public ReadOnlyCollection<string> GetAvailableReadLocations()
|
||||
{
|
||||
return this.locationInfo.AvailableReadLocations;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -315,7 +320,10 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
|
||||
public ReadOnlyCollection<Uri> GetApplicableEndpoints(DocumentServiceRequest request, bool isReadRequest)
|
||||
{
|
||||
ReadOnlyCollection<Uri> endpoints = isReadRequest ? this.ReadEndpoints : this.WriteEndpoints;
|
||||
ReadOnlyCollection<Uri> endpoints =
|
||||
isReadRequest
|
||||
? this.ReadEndpoints
|
||||
: this.WriteEndpoints;
|
||||
|
||||
if (request.RequestContext.ExcludeRegions == null || request.RequestContext.ExcludeRegions.Count == 0)
|
||||
{
|
||||
|
@ -323,52 +331,107 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
}
|
||||
|
||||
return this.GetApplicableEndpoints(
|
||||
endpoints,
|
||||
isReadRequest ? this.locationInfo.AvailableReadEndpointByLocation : this.locationInfo.AvailableWriteEndpointByLocation,
|
||||
this.defaultEndpoint,
|
||||
request.RequestContext.ExcludeRegions);
|
||||
}
|
||||
|
||||
public ReadOnlyCollection<string> GetApplicableRegions(IEnumerable<string> excludeRegions, bool isReadRequest)
|
||||
{
|
||||
return this.GetApplicableRegions(
|
||||
isReadRequest ? this.locationInfo.AvailableReadLocations : this.locationInfo.AvailableWriteLocations,
|
||||
this.locationInfo.PreferredLocations[0],
|
||||
excludeRegions);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets applicable endpoints for a request, if there are no applicable endpoints, returns the fallback endpoint
|
||||
/// </summary>
|
||||
/// <param name="endpoints"></param>
|
||||
/// <param name="regionNameByEndpoint"></param>
|
||||
/// <param name="fallbackEndpoint"></param>
|
||||
/// <param name="excludeRegions"></param>
|
||||
/// <returns>a list of applicable endpoints for a request</returns>
|
||||
private ReadOnlyCollection<Uri> GetApplicableEndpoints(
|
||||
IReadOnlyList<Uri> endpoints,
|
||||
ReadOnlyDictionary<string, Uri> regionNameByEndpoint,
|
||||
Uri fallbackEndpoint,
|
||||
IReadOnlyList<string> excludeRegions)
|
||||
IEnumerable<string> excludeRegions)
|
||||
{
|
||||
List<Uri> applicableEndpoints = new List<Uri>(regionNameByEndpoint.Count);
|
||||
HashSet<string> excludeRegionsHash = excludeRegions == null ? null : new HashSet<string>(excludeRegions);
|
||||
|
||||
if (excludeRegions != null)
|
||||
{
|
||||
foreach (string region in this.locationInfo.PreferredLocations)
|
||||
{
|
||||
if (!excludeRegionsHash.Contains(region)
|
||||
&& regionNameByEndpoint.TryGetValue(region, out Uri endpoint))
|
||||
{
|
||||
applicableEndpoints.Add(endpoint);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
foreach (string region in this.locationInfo.PreferredLocations)
|
||||
{
|
||||
if (regionNameByEndpoint.TryGetValue(region, out Uri endpoint))
|
||||
{
|
||||
applicableEndpoints.Add(endpoint);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (applicableEndpoints.Count == 0)
|
||||
{
|
||||
applicableEndpoints.Add(fallbackEndpoint);
|
||||
}
|
||||
|
||||
return new ReadOnlyCollection<Uri>(applicableEndpoints);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets applicable endpoints for a request, if there are no applicable endpoints, returns the fallback endpoint
|
||||
/// </summary>
|
||||
/// <param name="regionNameByEndpoint"></param>
|
||||
/// <param name="fallbackRegion"></param>
|
||||
/// <param name="excludeRegions"></param>
|
||||
/// <returns>a list of applicable endpoints for a request</returns>
|
||||
private ReadOnlyCollection<string> GetApplicableRegions(
|
||||
ReadOnlyCollection<string> regionNameByEndpoint,
|
||||
string fallbackRegion,
|
||||
IEnumerable<string> excludeRegions)
|
||||
{
|
||||
List<Uri> applicableEndpoints = new List<Uri>(endpoints.Count);
|
||||
HashSet<Uri> excludeUris = new HashSet<Uri>();
|
||||
|
||||
foreach (string region in excludeRegions)
|
||||
{
|
||||
string normalizedRegionName = this.regionNameMapper.GetCosmosDBRegionName(region);
|
||||
if (regionNameByEndpoint.ContainsKey(normalizedRegionName))
|
||||
{
|
||||
excludeUris.Add(regionNameByEndpoint[normalizedRegionName]);
|
||||
}
|
||||
}
|
||||
|
||||
foreach (Uri endpoint in endpoints)
|
||||
{
|
||||
if (!excludeUris.Contains(endpoint))
|
||||
{
|
||||
applicableEndpoints.Add(endpoint);
|
||||
}
|
||||
List<string> applicableRegions = new List<string>(regionNameByEndpoint.Count);
|
||||
HashSet<string> excludeRegionsHash = excludeRegions == null ? null : new HashSet<string>(excludeRegions);
|
||||
|
||||
if (excludeRegions != null)
|
||||
{
|
||||
foreach (string region in this.locationInfo.PreferredLocations)
|
||||
{
|
||||
if (regionNameByEndpoint.Contains(region)
|
||||
&& !excludeRegionsHash.Contains(region))
|
||||
{
|
||||
applicableRegions.Add(region);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
foreach (string region in this.locationInfo.PreferredLocations)
|
||||
{
|
||||
if (regionNameByEndpoint.Contains(region))
|
||||
{
|
||||
applicableRegions.Add(region);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (applicableEndpoints.Count == 0)
|
||||
if (applicableRegions.Count == 0)
|
||||
{
|
||||
applicableEndpoints.Add(fallbackEndpoint);
|
||||
applicableRegions.Add(fallbackRegion);
|
||||
}
|
||||
|
||||
return new ReadOnlyCollection<Uri>(applicableEndpoints);
|
||||
return new ReadOnlyCollection<string>(applicableRegions);
|
||||
}
|
||||
|
||||
public bool ShouldRefreshEndpoints(out bool canRefreshInBackground)
|
||||
|
|
|
@ -0,0 +1,891 @@
|
|||
|
||||
namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
||||
{
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Data;
|
||||
using System.IO;
|
||||
using System.Net.Http;
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Serialization;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using global::Azure.Core.Serialization;
|
||||
using Microsoft.Azure.Cosmos;
|
||||
using Microsoft.Azure.Cosmos.Diagnostics;
|
||||
using Microsoft.Azure.Cosmos.FaultInjection;
|
||||
using Microsoft.Azure.Documents;
|
||||
using Microsoft.VisualStudio.TestTools.UnitTesting;
|
||||
using Database = Database;
|
||||
using PartitionKey = PartitionKey;
|
||||
|
||||
[TestClass]
|
||||
public class CosmosAvailabilityStrategyTests
|
||||
{
|
||||
|
||||
private CosmosClient client;
|
||||
private Database database;
|
||||
private Container container;
|
||||
private Container changeFeedContainer;
|
||||
private CosmosSystemTextJsonSerializer cosmosSystemTextJsonSerializer;
|
||||
private string connectionString;
|
||||
private string dbName;
|
||||
private string containerName;
|
||||
private string changeFeedContainerName;
|
||||
|
||||
[TestCleanup]
|
||||
public async Task TestCleanup()
|
||||
{
|
||||
await this.database?.DeleteAsync();
|
||||
this.client?.Dispose();
|
||||
}
|
||||
|
||||
private static readonly FaultInjectionCondition readConditon = new FaultInjectionConditionBuilder()
|
||||
.WithRegion("Central US")
|
||||
.WithOperationType(FaultInjectionOperationType.ReadItem)
|
||||
.Build();
|
||||
private static readonly FaultInjectionCondition queryConditon = new FaultInjectionConditionBuilder()
|
||||
.WithRegion("Central US")
|
||||
.WithOperationType(FaultInjectionOperationType.QueryItem)
|
||||
.Build();
|
||||
private static readonly FaultInjectionCondition readManyCondition = new FaultInjectionConditionBuilder()
|
||||
.WithRegion("Central US")
|
||||
.WithOperationType(FaultInjectionOperationType.QueryItem)
|
||||
.Build();
|
||||
private static readonly FaultInjectionCondition changeFeedCondtion = new FaultInjectionConditionBuilder()
|
||||
.WithRegion("Central US")
|
||||
.WithOperationType(FaultInjectionOperationType.All)
|
||||
.Build();
|
||||
|
||||
private static readonly FaultInjectionCondition readConditonStep = new FaultInjectionConditionBuilder()
|
||||
.WithRegion("North Central US")
|
||||
.WithOperationType(FaultInjectionOperationType.ReadItem)
|
||||
.Build();
|
||||
private static readonly FaultInjectionCondition queryConditonStep = new FaultInjectionConditionBuilder()
|
||||
.WithRegion("North Central US")
|
||||
.WithOperationType(FaultInjectionOperationType.QueryItem)
|
||||
.Build();
|
||||
private static readonly FaultInjectionCondition readManyConditionStep = new FaultInjectionConditionBuilder()
|
||||
.WithRegion("North Central US")
|
||||
.WithOperationType(FaultInjectionOperationType.QueryItem)
|
||||
.Build();
|
||||
private static readonly FaultInjectionCondition changeFeedCondtionStep = new FaultInjectionConditionBuilder()
|
||||
.WithRegion("North Central US")
|
||||
.WithOperationType(FaultInjectionOperationType.ReadFeed)
|
||||
.Build();
|
||||
|
||||
private static readonly IFaultInjectionResult goneResult = FaultInjectionResultBuilder
|
||||
.GetResultBuilder(FaultInjectionServerErrorType.Gone)
|
||||
.Build();
|
||||
private static readonly IFaultInjectionResult retryWithResult = FaultInjectionResultBuilder
|
||||
.GetResultBuilder(FaultInjectionServerErrorType.RetryWith)
|
||||
.Build();
|
||||
private static readonly IFaultInjectionResult internalServerErrorResult = FaultInjectionResultBuilder
|
||||
.GetResultBuilder(FaultInjectionServerErrorType.InternalServerEror)
|
||||
.Build();
|
||||
private static readonly IFaultInjectionResult readSessionNotAvailableResult = FaultInjectionResultBuilder
|
||||
.GetResultBuilder(FaultInjectionServerErrorType.ReadSessionNotAvailable)
|
||||
.Build();
|
||||
private static readonly IFaultInjectionResult timeoutResult = FaultInjectionResultBuilder
|
||||
.GetResultBuilder(FaultInjectionServerErrorType.Timeout)
|
||||
.Build();
|
||||
private static readonly IFaultInjectionResult partitionIsSplittingResult = FaultInjectionResultBuilder
|
||||
.GetResultBuilder(FaultInjectionServerErrorType.PartitionIsSplitting)
|
||||
.Build();
|
||||
private static readonly IFaultInjectionResult partitionIsMigratingResult = FaultInjectionResultBuilder
|
||||
.GetResultBuilder(FaultInjectionServerErrorType.PartitionIsMigrating)
|
||||
.Build();
|
||||
private static readonly IFaultInjectionResult serviceUnavailableResult = FaultInjectionResultBuilder
|
||||
.GetResultBuilder(FaultInjectionServerErrorType.ServiceUnavailable)
|
||||
.Build();
|
||||
private static readonly IFaultInjectionResult responseDelayResult = FaultInjectionResultBuilder
|
||||
.GetResultBuilder(FaultInjectionServerErrorType.ResponseDelay)
|
||||
.WithDelay(TimeSpan.FromMilliseconds(4000))
|
||||
.Build();
|
||||
|
||||
private readonly Dictionary<string, FaultInjectionCondition> conditions = new Dictionary<string, FaultInjectionCondition>()
|
||||
{
|
||||
{ "Read", readConditon },
|
||||
{ "Query", queryConditon },
|
||||
{ "ReadMany", readManyCondition },
|
||||
{ "ChangeFeed", changeFeedCondtion },
|
||||
{ "ReadStep", readConditonStep },
|
||||
{ "QueryStep", queryConditonStep },
|
||||
{ "ReadManyStep", readManyConditionStep },
|
||||
{ "ChangeFeedStep", changeFeedCondtionStep}
|
||||
};
|
||||
|
||||
private readonly Dictionary<string, IFaultInjectionResult> results = new Dictionary<string, IFaultInjectionResult>()
|
||||
{
|
||||
{ "Gone", goneResult },
|
||||
{ "RetryWith", retryWithResult },
|
||||
{ "InternalServerError", internalServerErrorResult },
|
||||
{ "ReadSessionNotAvailable", readSessionNotAvailableResult },
|
||||
{ "Timeout", timeoutResult },
|
||||
{ "PartitionIsSplitting", partitionIsSplittingResult },
|
||||
{ "PartitionIsMigrating", partitionIsMigratingResult },
|
||||
{ "ServiceUnavailable", serviceUnavailableResult },
|
||||
{ "ResponseDelay", responseDelayResult }
|
||||
};
|
||||
|
||||
[TestInitialize]
|
||||
public async Task TestInitAsync()
|
||||
{
|
||||
this.connectionString = ConfigurationManager.GetEnvironmentVariable<string>("COSMOSDB_MULTI_REGION", null);
|
||||
|
||||
JsonSerializerOptions jsonSerializerOptions = new JsonSerializerOptions()
|
||||
{
|
||||
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
|
||||
};
|
||||
this.cosmosSystemTextJsonSerializer = new CosmosSystemTextJsonSerializer(jsonSerializerOptions);
|
||||
|
||||
if (string.IsNullOrEmpty(this.connectionString))
|
||||
{
|
||||
Assert.Fail("Set environment variable COSMOSDB_MULTI_REGION to run the tests");
|
||||
}
|
||||
this.client = new CosmosClient(
|
||||
this.connectionString,
|
||||
new CosmosClientOptions()
|
||||
{
|
||||
Serializer = this.cosmosSystemTextJsonSerializer,
|
||||
});
|
||||
|
||||
this.dbName = Guid.NewGuid().ToString();
|
||||
this.containerName = Guid.NewGuid().ToString();
|
||||
this.changeFeedContainerName = Guid.NewGuid().ToString();
|
||||
this.database = await this.client.CreateDatabaseIfNotExistsAsync(this.dbName);
|
||||
this.container = await this.database.CreateContainerIfNotExistsAsync(this.containerName, "/pk");
|
||||
this.changeFeedContainer = await this.database.CreateContainerIfNotExistsAsync(this.changeFeedContainerName, "/partitionKey");
|
||||
|
||||
await this.container.CreateItemAsync<AvailabilityStrategyTestObject>(new AvailabilityStrategyTestObject { Id = "testId", Pk = "pk" });
|
||||
await this.container.CreateItemAsync<AvailabilityStrategyTestObject>(new AvailabilityStrategyTestObject { Id = "testId2", Pk = "pk2" });
|
||||
await this.container.CreateItemAsync<AvailabilityStrategyTestObject>(new AvailabilityStrategyTestObject { Id = "testId3", Pk = "pk3" });
|
||||
await this.container.CreateItemAsync<AvailabilityStrategyTestObject>(new AvailabilityStrategyTestObject { Id = "testId4", Pk = "pk4" });
|
||||
|
||||
//Must Ensure the data is replicated to all regions
|
||||
await Task.Delay(60000);
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
[TestCategory("MultiRegion")]
|
||||
public async Task AvailabilityStrategyNoTriggerTest()
|
||||
{
|
||||
FaultInjectionRule responseDelay = new FaultInjectionRuleBuilder(
|
||||
id: "responseDely",
|
||||
condition:
|
||||
new FaultInjectionConditionBuilder()
|
||||
.WithRegion("Central US")
|
||||
.WithOperationType(FaultInjectionOperationType.ReadItem)
|
||||
.Build(),
|
||||
result:
|
||||
FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ResponseDelay)
|
||||
.WithDelay(TimeSpan.FromMilliseconds(300))
|
||||
.Build())
|
||||
.WithDuration(TimeSpan.FromMinutes(90))
|
||||
.Build();
|
||||
|
||||
List<FaultInjectionRule> rules = new List<FaultInjectionRule>() { responseDelay };
|
||||
FaultInjector faultInjector = new FaultInjector(rules);
|
||||
|
||||
responseDelay.Disable();
|
||||
|
||||
CosmosClientOptions clientOptions = new CosmosClientOptions()
|
||||
{
|
||||
ConnectionMode = ConnectionMode.Direct,
|
||||
ApplicationPreferredRegions = new List<string>() { "Central US", "North Central US" },
|
||||
AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy(
|
||||
threshold: TimeSpan.FromMilliseconds(1500),
|
||||
thresholdStep: TimeSpan.FromMilliseconds(50)),
|
||||
Serializer = this.cosmosSystemTextJsonSerializer
|
||||
};
|
||||
|
||||
CosmosClient faultInjectionClient = new CosmosClient(
|
||||
connectionString: this.connectionString,
|
||||
clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions));
|
||||
|
||||
Database database = faultInjectionClient.GetDatabase(this.dbName);
|
||||
Container container = database.GetContainer(this.containerName);
|
||||
|
||||
responseDelay.Enable();
|
||||
ItemResponse<AvailabilityStrategyTestObject> ir = await container.ReadItemAsync<AvailabilityStrategyTestObject>("testId", new PartitionKey("pk"));
|
||||
|
||||
CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Original Request", (string)hedgeContext);
|
||||
|
||||
faultInjectionClient.Dispose();
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
[TestCategory("MultiRegion")]
|
||||
public async Task AvailabilityStrategyRequestOptionsTriggerTest()
|
||||
{
|
||||
FaultInjectionRule responseDelay = new FaultInjectionRuleBuilder(
|
||||
id: "responseDely",
|
||||
condition:
|
||||
new FaultInjectionConditionBuilder()
|
||||
.WithRegion("Central US")
|
||||
.WithOperationType(FaultInjectionOperationType.ReadItem)
|
||||
.Build(),
|
||||
result:
|
||||
FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ResponseDelay)
|
||||
.WithDelay(TimeSpan.FromMilliseconds(4000))
|
||||
.Build())
|
||||
.WithDuration(TimeSpan.FromMinutes(90))
|
||||
.Build();
|
||||
|
||||
List<FaultInjectionRule> rules = new List<FaultInjectionRule>() { responseDelay };
|
||||
FaultInjector faultInjector = new FaultInjector(rules);
|
||||
|
||||
responseDelay.Disable();
|
||||
|
||||
CosmosClientOptions clientOptions = new CosmosClientOptions()
|
||||
{
|
||||
ConnectionMode = ConnectionMode.Direct,
|
||||
ApplicationPreferredRegions = new List<string>() { "Central US", "North Central US" },
|
||||
Serializer = this.cosmosSystemTextJsonSerializer
|
||||
};
|
||||
|
||||
CosmosClient faultInjectionClient = new CosmosClient(
|
||||
connectionString: this.connectionString,
|
||||
clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions));
|
||||
|
||||
Database database = faultInjectionClient.GetDatabase(this.dbName);
|
||||
Container container = database.GetContainer(this.containerName);
|
||||
|
||||
responseDelay.Enable();
|
||||
|
||||
ItemRequestOptions requestOptions = new ItemRequestOptions
|
||||
{
|
||||
AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy(
|
||||
threshold: TimeSpan.FromMilliseconds(100),
|
||||
thresholdStep: TimeSpan.FromMilliseconds(50))
|
||||
};
|
||||
ItemResponse<AvailabilityStrategyTestObject> ir = await container.ReadItemAsync<AvailabilityStrategyTestObject>(
|
||||
"testId",
|
||||
new PartitionKey("pk"),
|
||||
requestOptions);
|
||||
|
||||
CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsObject);
|
||||
Assert.IsNotNull(excludeRegionsObject);
|
||||
List<string> excludeRegionsList = excludeRegionsObject as List<string>;
|
||||
Assert.IsTrue(excludeRegionsList.Contains("Central US"));
|
||||
faultInjectionClient.Dispose();
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
[TestCategory("MultiRegion")]
|
||||
public async Task AvailabilityStrategyDisableOverideTest()
|
||||
{
|
||||
FaultInjectionRule responseDelay = new FaultInjectionRuleBuilder(
|
||||
id: "responseDely",
|
||||
condition:
|
||||
new FaultInjectionConditionBuilder()
|
||||
.WithRegion("Central US")
|
||||
.WithOperationType(FaultInjectionOperationType.ReadItem)
|
||||
.Build(),
|
||||
result:
|
||||
FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ResponseDelay)
|
||||
.WithDelay(TimeSpan.FromMilliseconds(6000))
|
||||
.Build())
|
||||
.WithDuration(TimeSpan.FromMinutes(90))
|
||||
.WithHitLimit(2)
|
||||
.Build();
|
||||
|
||||
List<FaultInjectionRule> rules = new List<FaultInjectionRule>() { responseDelay };
|
||||
FaultInjector faultInjector = new FaultInjector(rules);
|
||||
|
||||
responseDelay.Disable();
|
||||
|
||||
CosmosClientOptions clientOptions = new CosmosClientOptions()
|
||||
{
|
||||
ConnectionMode = ConnectionMode.Direct,
|
||||
ApplicationPreferredRegions = new List<string>() { "Central US", "North Central US" },
|
||||
AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy(
|
||||
threshold: TimeSpan.FromMilliseconds(100),
|
||||
thresholdStep: TimeSpan.FromMilliseconds(50)),
|
||||
Serializer = this.cosmosSystemTextJsonSerializer
|
||||
};
|
||||
|
||||
CosmosClient faultInjectionClient = new CosmosClient(
|
||||
connectionString: this.connectionString,
|
||||
clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions));
|
||||
|
||||
Database database = faultInjectionClient.GetDatabase(this.dbName);
|
||||
Container container = database.GetContainer(this.containerName);
|
||||
|
||||
responseDelay.Enable();
|
||||
ItemRequestOptions requestOptions = new ItemRequestOptions
|
||||
{
|
||||
AvailabilityStrategy = new DisabledAvailabilityStrategy()
|
||||
};
|
||||
|
||||
ItemResponse<AvailabilityStrategyTestObject> ir = await container.ReadItemAsync<AvailabilityStrategyTestObject>(
|
||||
"testId",
|
||||
new PartitionKey("pk"),
|
||||
requestOptions);
|
||||
|
||||
CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
Assert.IsFalse(traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out _));
|
||||
|
||||
faultInjectionClient.Dispose();
|
||||
}
|
||||
|
||||
[DataTestMethod]
|
||||
[TestCategory("MultiRegion")]
|
||||
[DataRow("Read", "Read", "Gone", DisplayName = "Read | Gone")]
|
||||
[DataRow("Read", "Read", "RetryWith", DisplayName = "Read | RetryWith")]
|
||||
[DataRow("Read", "Read", "InternalServerError", DisplayName = "Read | InternalServerError")]
|
||||
[DataRow("Read", "Read", "ReadSessionNotAvailable", DisplayName = "Read | ReadSessionNotAvailable")]
|
||||
[DataRow("Read", "Read", "Timeout", DisplayName = "Read | Timeout")]
|
||||
[DataRow("Read", "Read", "PartitionIsSplitting", DisplayName = "Read | PartitionIsSplitting")]
|
||||
[DataRow("Read", "Read", "PartitionIsMigrating", DisplayName = "Read | PartitionIsMigrating")]
|
||||
[DataRow("Read", "Read", "ServiceUnavailable", DisplayName = "Read | ServiceUnavailable")]
|
||||
[DataRow("Read", "Read", "ResponseDelay", DisplayName = "Read | ResponseDelay")]
|
||||
[DataRow("SinglePartitionQuery", "Query", "Gone", DisplayName = "SinglePartitionQuery | Gone")]
|
||||
[DataRow("SinglePartitionQuery", "Query", "RetryWith", DisplayName = "SinglePartitionQuery | RetryWith")]
|
||||
[DataRow("SinglePartitionQuery", "Query", "InternalServerError", DisplayName = "SinglePartitionQuery | InternalServerError")]
|
||||
[DataRow("SinglePartitionQuery", "Query", "ReadSessionNotAvailable", DisplayName = "SinglePartitionQuery | ReadSessionNotAvailable")]
|
||||
[DataRow("SinglePartitionQuery", "Query", "Timeout", DisplayName = "SinglePartitionQuery | Timeout")]
|
||||
[DataRow("SinglePartitionQuery", "Query", "PartitionIsSplitting", DisplayName = "SinglePartitionQuery | PartitionIsSplitting")]
|
||||
[DataRow("SinglePartitionQuery", "Query", "PartitionIsMigrating", DisplayName = "SinglePartitionQuery | PartitionIsMigrating")]
|
||||
[DataRow("SinglePartitionQuery", "Query", "ServiceUnavailable", DisplayName = "SinglePartitionQuery | ServiceUnavailable")]
|
||||
[DataRow("SinglePartitionQuery", "Query", "ResponseDelay", DisplayName = "SinglePartitionQuery | ResponseDelay")]
|
||||
[DataRow("CrossPartitionQuery", "Query", "Gone", DisplayName = "CrossPartitionQuery | Gone")]
|
||||
[DataRow("CrossPartitionQuery", "Query", "RetryWith", DisplayName = "CrossPartitionQuery | RetryWith")]
|
||||
[DataRow("CrossPartitionQuery", "Query", "InternalServerError", DisplayName = "CrossPartitionQuery | InternalServerError")]
|
||||
[DataRow("CrossPartitionQuery", "Query", "ReadSessionNotAvailable", DisplayName = "CrossPartitionQuery | ReadSessionNotAvailable")]
|
||||
[DataRow("CrossPartitionQuery", "Query", "Timeout", DisplayName = "CrossPartitionQuery | Timeout")]
|
||||
[DataRow("CrossPartitionQuery", "Query", "PartitionIsSplitting", DisplayName = "CrossPartitionQuery | PartitionIsSplitting")]
|
||||
[DataRow("CrossPartitionQuery", "Query", "PartitionIsMigrating", DisplayName = "CrossPartitionQuery | PartitionIsMigrating")]
|
||||
[DataRow("CrossPartitionQuery", "Query", "ServiceUnavailable", DisplayName = "CrossPartitionQuery | ServiceUnavailable")]
|
||||
[DataRow("CrossPartitionQuery", "Query", "ResponseDelay", DisplayName = "CrossPartitionQuery | ResponseDelay")]
|
||||
[DataRow("ReadMany", "ReadMany", "Gone", DisplayName = "ReadMany | Gone")]
|
||||
[DataRow("ReadMany", "ReadMany", "RetryWith", DisplayName = "ReadMany | RetryWith")]
|
||||
[DataRow("ReadMany", "ReadMany", "InternalServerError", DisplayName = "ReadMany | InternalServerError")]
|
||||
[DataRow("ReadMany", "ReadMany", "ReadSessionNotAvailable", DisplayName = "ReadMany | ReadSessionNotAvailable")]
|
||||
[DataRow("ReadMany", "ReadMany", "Timeout", DisplayName = "ReadMany | Timeout")]
|
||||
[DataRow("ReadMany", "ReadMany", "PartitionIsSplitting", DisplayName = "ReadMany | PartitionIsSplitting")]
|
||||
[DataRow("ReadMany", "ReadMany", "PartitionIsMigrating", DisplayName = "ReadMany | PartitionIsMigrating")]
|
||||
[DataRow("ReadMany", "ReadMany", "ServiceUnavailable", DisplayName = "ReadMany | ServiceUnavailable")]
|
||||
[DataRow("ReadMany", "ReadMany", "ResponseDelay", DisplayName = "ReadMany | ResponseDelay")]
|
||||
[DataRow("ChangeFeed", "ChangeFeed", "Gone", DisplayName = "ChangeFeed | Gone")]
|
||||
[DataRow("ChangeFeed", "ChangeFeed", "RetryWith", DisplayName = "ChangeFeed | RetryWith")]
|
||||
[DataRow("ChangeFeed", "ChangeFeed", "InternalServerError", DisplayName = "ChangeFeed | InternalServerError")]
|
||||
[DataRow("ChangeFeed", "ChangeFeed", "ReadSessionNotAvailable", DisplayName = "ChangeFeed | ReadSessionNotAvailable")]
|
||||
[DataRow("ChangeFeed", "ChangeFeed", "Timeout", DisplayName = "ChangeFeed | Timeout")]
|
||||
[DataRow("ChangeFeed", "ChangeFeed", "PartitionIsSplitting", DisplayName = "ChangeFeed | PartitionIsSplitting")]
|
||||
[DataRow("ChangeFeed", "ChangeFeed", "PartitionIsMigrating", DisplayName = "ChangeFeed | PartitionIsMigrating")]
|
||||
[DataRow("ChangeFeed", "ChangeFeed", "ServiceUnavailable", DisplayName = "ChangeFeed | ServiceUnavailable")]
|
||||
[DataRow("ChangeFeed", "ChangeFeed", "ResponseDelay", DisplayName = "ChangeFeed | ResponseDelay")]
|
||||
public async Task AvailabilityStrategyAllFaultsTests(string operation, string conditonName, string resultName)
|
||||
{
|
||||
FaultInjectionCondition conditon = this.conditions[conditonName];
|
||||
IFaultInjectionResult result = this.results[resultName];
|
||||
|
||||
FaultInjectionRule rule = new FaultInjectionRuleBuilder(
|
||||
id: operation,
|
||||
condition: conditon,
|
||||
result: result)
|
||||
.WithDuration(TimeSpan.FromMinutes(90))
|
||||
.Build();
|
||||
|
||||
List<FaultInjectionRule> rules = new List<FaultInjectionRule>() { rule };
|
||||
FaultInjector faultInjector = new FaultInjector(rules);
|
||||
|
||||
rule.Disable();
|
||||
|
||||
CosmosClientOptions clientOptions = new CosmosClientOptions()
|
||||
{
|
||||
ConnectionMode = ConnectionMode.Direct,
|
||||
ApplicationPreferredRegions = new List<string>() { "Central US", "North Central US" },
|
||||
AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy(
|
||||
threshold: TimeSpan.FromMilliseconds(100),
|
||||
thresholdStep: TimeSpan.FromMilliseconds(50)),
|
||||
Serializer = this.cosmosSystemTextJsonSerializer
|
||||
};
|
||||
|
||||
CosmosClient faultInjectionClient = new CosmosClient(
|
||||
connectionString: this.connectionString,
|
||||
clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions));
|
||||
|
||||
Database database = faultInjectionClient.GetDatabase(this.dbName);
|
||||
Container container = database.GetContainer(this.containerName);
|
||||
|
||||
CosmosTraceDiagnostics traceDiagnostic;
|
||||
object hedgeContext;
|
||||
List<string> excludeRegionsList;
|
||||
switch (operation)
|
||||
{
|
||||
case "Read":
|
||||
rule.Enable();
|
||||
|
||||
ItemResponse<AvailabilityStrategyTestObject> ir = await container.ReadItemAsync<AvailabilityStrategyTestObject>(
|
||||
"testId",
|
||||
new PartitionKey("pk"));
|
||||
|
||||
Assert.IsTrue(rule.GetHitCount() > 0);
|
||||
traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsObject);
|
||||
excludeRegionsList = excludeRegionsObject as List<string>;
|
||||
Assert.IsTrue(excludeRegionsList.Contains("Central US"));
|
||||
|
||||
break;
|
||||
|
||||
case "SinglePartitionQuery":
|
||||
string queryString = "SELECT * FROM c";
|
||||
|
||||
QueryRequestOptions requestOptions = new QueryRequestOptions()
|
||||
{
|
||||
PartitionKey = new PartitionKey("pk"),
|
||||
};
|
||||
|
||||
FeedIterator<AvailabilityStrategyTestObject> queryIterator = container.GetItemQueryIterator<AvailabilityStrategyTestObject>(
|
||||
new QueryDefinition(queryString),
|
||||
requestOptions: requestOptions);
|
||||
|
||||
rule.Enable();
|
||||
|
||||
while (queryIterator.HasMoreResults)
|
||||
{
|
||||
FeedResponse<AvailabilityStrategyTestObject> feedResponse = await queryIterator.ReadNextAsync();
|
||||
|
||||
Assert.IsTrue(rule.GetHitCount() > 0);
|
||||
traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsQueryObject);
|
||||
excludeRegionsList = excludeRegionsQueryObject as List<string>;
|
||||
Assert.IsTrue(excludeRegionsList.Contains("Central US"));
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case "CrossPartitionQuery":
|
||||
string crossPartitionQueryString = "SELECT * FROM c";
|
||||
FeedIterator<AvailabilityStrategyTestObject> crossPartitionQueryIterator = container.GetItemQueryIterator<AvailabilityStrategyTestObject>(
|
||||
new QueryDefinition(crossPartitionQueryString));
|
||||
|
||||
rule.Enable();
|
||||
|
||||
while (crossPartitionQueryIterator.HasMoreResults)
|
||||
{
|
||||
FeedResponse<AvailabilityStrategyTestObject> feedResponse = await crossPartitionQueryIterator.ReadNextAsync();
|
||||
|
||||
Assert.IsTrue(rule.GetHitCount() > 0);
|
||||
traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsQueryObject);
|
||||
excludeRegionsList = excludeRegionsQueryObject as List<string>;
|
||||
Assert.IsTrue(excludeRegionsList.Contains("Central US"));
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case "ReadMany":
|
||||
rule.Enable();
|
||||
|
||||
FeedResponse<AvailabilityStrategyTestObject> readManyResponse = await container.ReadManyItemsAsync<AvailabilityStrategyTestObject>(
|
||||
new List<(string, PartitionKey)>()
|
||||
{
|
||||
("testId", new PartitionKey("pk")),
|
||||
("testId2", new PartitionKey("pk2")),
|
||||
("testId3", new PartitionKey("pk3")),
|
||||
("testId4", new PartitionKey("pk4"))
|
||||
});
|
||||
|
||||
Assert.IsTrue(rule.GetHitCount() > 0);
|
||||
traceDiagnostic = readManyResponse.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsReadManyObject);
|
||||
excludeRegionsList = excludeRegionsReadManyObject as List<string>;
|
||||
Assert.IsTrue(excludeRegionsList.Contains("Central US"));
|
||||
|
||||
break;
|
||||
|
||||
case "ChangeFeed":
|
||||
Container leaseContainer = database.GetContainer(this.changeFeedContainerName);
|
||||
ChangeFeedProcessor changeFeedProcessor = container.GetChangeFeedProcessorBuilder<AvailabilityStrategyTestObject>(
|
||||
processorName: "AvialabilityStrategyTest",
|
||||
onChangesDelegate: HandleChangesAsync)
|
||||
.WithInstanceName("test")
|
||||
.WithLeaseContainer(leaseContainer)
|
||||
.Build();
|
||||
await changeFeedProcessor.StartAsync();
|
||||
await Task.Delay(1000);
|
||||
AvailabilityStrategyTestObject testObject = new AvailabilityStrategyTestObject
|
||||
{
|
||||
Id = "testId5",
|
||||
Pk = "pk5",
|
||||
Other = "other"
|
||||
};
|
||||
await container.CreateItemAsync<AvailabilityStrategyTestObject>(testObject);
|
||||
|
||||
rule.Enable();
|
||||
|
||||
await Task.Delay(5000);
|
||||
|
||||
Assert.IsTrue(rule.GetHitCount() > 0);
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
|
||||
Assert.Fail("Invalid operation");
|
||||
break;
|
||||
}
|
||||
|
||||
rule.Disable();
|
||||
|
||||
faultInjectionClient.Dispose();
|
||||
}
|
||||
|
||||
[DataTestMethod]
|
||||
[TestCategory("MultiRegion")]
|
||||
[DataRow("Read", "Read", "ReadStep", DisplayName = "Read | ReadStep")]
|
||||
[DataRow("SinglePartitionQuery", "Query", "QueryStep", DisplayName = "Query | SinglePartitionQueryStep")]
|
||||
[DataRow("CrossPartitionQuery", "Query", "QueryStep", DisplayName = "Query | CrossPartitionQueryStep")]
|
||||
[DataRow("ReadMany", "ReadMany", "ReadManyStep", DisplayName = "ReadMany | ReadManyStep")]
|
||||
[DataRow("ChangeFeed", "ChangeFeed", "ChangeFeedStep", DisplayName = "ChangeFeed | ChangeFeedStep")]
|
||||
public async Task AvailabilityStrategyStepTests(string operation, string conditonName1, string conditionName2)
|
||||
{
|
||||
FaultInjectionCondition conditon1 = this.conditions[conditonName1];
|
||||
FaultInjectionCondition conditon2 = this.conditions[conditionName2];
|
||||
IFaultInjectionResult result = responseDelayResult;
|
||||
|
||||
FaultInjectionRule rule1 = new FaultInjectionRuleBuilder(
|
||||
id: operation,
|
||||
condition: conditon1,
|
||||
result: result)
|
||||
.WithDuration(TimeSpan.FromMinutes(90))
|
||||
.Build();
|
||||
|
||||
FaultInjectionRule rule2 = new FaultInjectionRuleBuilder(
|
||||
id: operation,
|
||||
condition: conditon2,
|
||||
result: result)
|
||||
.WithDuration(TimeSpan.FromMinutes(90))
|
||||
.Build();
|
||||
|
||||
List<FaultInjectionRule> rules = new List<FaultInjectionRule>() { rule1, rule2 };
|
||||
FaultInjector faultInjector = new FaultInjector(rules);
|
||||
|
||||
rule1.Disable();
|
||||
rule2.Disable();
|
||||
|
||||
CosmosClientOptions clientOptions = new CosmosClientOptions()
|
||||
{
|
||||
ConnectionMode = ConnectionMode.Direct,
|
||||
ApplicationPreferredRegions = new List<string>() { "Central US", "North Central US", "East US" },
|
||||
AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy(
|
||||
threshold: TimeSpan.FromMilliseconds(100),
|
||||
thresholdStep: TimeSpan.FromMilliseconds(50)),
|
||||
Serializer = this.cosmosSystemTextJsonSerializer
|
||||
};
|
||||
|
||||
CosmosClient faultInjectionClient = new CosmosClient(
|
||||
connectionString: this.connectionString,
|
||||
clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions));
|
||||
|
||||
Database database = faultInjectionClient.GetDatabase(this.dbName);
|
||||
Container container = database.GetContainer(this.containerName);
|
||||
|
||||
CosmosTraceDiagnostics traceDiagnostic;
|
||||
object hedgeContext;
|
||||
List<string> excludeRegionsList;
|
||||
switch (operation)
|
||||
{
|
||||
case "Read":
|
||||
rule1.Enable();
|
||||
rule2.Enable();
|
||||
|
||||
ItemResponse<AvailabilityStrategyTestObject> ir = await container.ReadItemAsync<AvailabilityStrategyTestObject>(
|
||||
"testId",
|
||||
new PartitionKey("pk"));
|
||||
|
||||
traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsObject);
|
||||
excludeRegionsList = excludeRegionsObject as List<string>;
|
||||
Assert.IsTrue(excludeRegionsList.Contains("Central US"));
|
||||
Assert.IsTrue(excludeRegionsList.Contains("North Central US"));
|
||||
|
||||
break;
|
||||
|
||||
case "SinglePartitionQuery":
|
||||
string queryString = "SELECT * FROM c";
|
||||
|
||||
QueryRequestOptions requestOptions = new QueryRequestOptions()
|
||||
{
|
||||
PartitionKey = new PartitionKey("pk"),
|
||||
};
|
||||
|
||||
FeedIterator<AvailabilityStrategyTestObject> queryIterator = container.GetItemQueryIterator<AvailabilityStrategyTestObject>(
|
||||
new QueryDefinition(queryString),
|
||||
requestOptions: requestOptions);
|
||||
|
||||
rule1.Enable();
|
||||
rule2.Enable();
|
||||
|
||||
while (queryIterator.HasMoreResults)
|
||||
{
|
||||
FeedResponse<AvailabilityStrategyTestObject> feedResponse = await queryIterator.ReadNextAsync();
|
||||
|
||||
traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsQueryObject);
|
||||
excludeRegionsList = excludeRegionsQueryObject as List<string>;
|
||||
Assert.IsTrue(excludeRegionsList.Contains("Central US"));
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case "CrossPartitionQuery":
|
||||
string crossPartitionQueryString = "SELECT * FROM c";
|
||||
FeedIterator<AvailabilityStrategyTestObject> crossPartitionQueryIterator = container.GetItemQueryIterator<AvailabilityStrategyTestObject>(
|
||||
new QueryDefinition(crossPartitionQueryString));
|
||||
|
||||
rule1.Enable();
|
||||
rule2.Enable();
|
||||
|
||||
while (crossPartitionQueryIterator.HasMoreResults)
|
||||
{
|
||||
FeedResponse<AvailabilityStrategyTestObject> feedResponse = await crossPartitionQueryIterator.ReadNextAsync();
|
||||
|
||||
traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsQueryObject);
|
||||
excludeRegionsList = excludeRegionsQueryObject as List<string>;
|
||||
Assert.IsTrue(excludeRegionsList.Contains("Central US"));
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case "ReadMany":
|
||||
rule1.Enable();
|
||||
rule2.Enable();
|
||||
|
||||
FeedResponse<AvailabilityStrategyTestObject> readManyResponse = await container.ReadManyItemsAsync<AvailabilityStrategyTestObject>(
|
||||
new List<(string, PartitionKey)>()
|
||||
{
|
||||
("testId", new PartitionKey("pk")),
|
||||
("testId2", new PartitionKey("pk2")),
|
||||
("testId3", new PartitionKey("pk3")),
|
||||
("testId4", new PartitionKey("pk4"))
|
||||
});
|
||||
|
||||
traceDiagnostic = readManyResponse.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsReadManyObject);
|
||||
excludeRegionsList = excludeRegionsReadManyObject as List<string>;
|
||||
Assert.IsTrue(excludeRegionsList.Contains("Central US"));
|
||||
Assert.IsTrue(excludeRegionsList.Contains("North Central US"));
|
||||
|
||||
break;
|
||||
|
||||
case "ChangeFeed":
|
||||
Container leaseContainer = database.GetContainer(this.changeFeedContainerName);
|
||||
ChangeFeedProcessor changeFeedProcessor = container.GetChangeFeedProcessorBuilder<AvailabilityStrategyTestObject>(
|
||||
processorName: "AvialabilityStrategyTest",
|
||||
onChangesDelegate: HandleChangesStepAsync)
|
||||
.WithInstanceName("test")
|
||||
.WithLeaseContainer(leaseContainer)
|
||||
.Build();
|
||||
await changeFeedProcessor.StartAsync();
|
||||
await Task.Delay(1000);
|
||||
AvailabilityStrategyTestObject testObject = new AvailabilityStrategyTestObject
|
||||
{
|
||||
Id = "testId5",
|
||||
Pk = "pk5",
|
||||
Other = "other"
|
||||
};
|
||||
await container.CreateItemAsync<AvailabilityStrategyTestObject>(testObject);
|
||||
|
||||
rule1.Enable();
|
||||
rule2.Enable();
|
||||
|
||||
await Task.Delay(5000);
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
Assert.Fail("Invalid operation");
|
||||
break;
|
||||
}
|
||||
|
||||
rule1.Disable();
|
||||
rule2.Disable();
|
||||
|
||||
faultInjectionClient.Dispose();
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
[TestCategory("MultiRegion")]
|
||||
public async Task RequestMessageCloneTests()
|
||||
{
|
||||
RequestMessage httpRequest = new RequestMessage(
|
||||
HttpMethod.Get,
|
||||
new Uri("/dbs/testdb/colls/testcontainer/docs/testId", UriKind.Relative));
|
||||
|
||||
string key = Guid.NewGuid().ToString();
|
||||
Dictionary<string, object> properties = new Dictionary<string, object>()
|
||||
{
|
||||
{ key, Guid.NewGuid() }
|
||||
};
|
||||
|
||||
RequestOptions requestOptions = new RequestOptions()
|
||||
{
|
||||
Properties = properties
|
||||
};
|
||||
|
||||
httpRequest.RequestOptions = requestOptions;
|
||||
httpRequest.ResourceType = ResourceType.Document;
|
||||
httpRequest.OperationType = OperationType.Read;
|
||||
httpRequest.Headers.CorrelatedActivityId = Guid.NewGuid().ToString();
|
||||
httpRequest.PartitionKeyRangeId = new PartitionKeyRangeIdentity("0", "1");
|
||||
httpRequest.UseGatewayMode = true;
|
||||
httpRequest.ContainerId = "testcontainer";
|
||||
httpRequest.DatabaseId = "testdb";
|
||||
httpRequest.Content = Stream.Null;
|
||||
|
||||
using (CloneableStream clonedBody = await StreamExtension.AsClonableStreamAsync(httpRequest.Content))
|
||||
{
|
||||
RequestMessage clone = httpRequest.Clone(httpRequest.Trace, clonedBody);
|
||||
|
||||
Assert.AreEqual(httpRequest.RequestOptions.Properties, clone.RequestOptions.Properties);
|
||||
Assert.AreEqual(httpRequest.ResourceType, clone.ResourceType);
|
||||
Assert.AreEqual(httpRequest.OperationType, clone.OperationType);
|
||||
Assert.AreEqual(httpRequest.Headers.CorrelatedActivityId, clone.Headers.CorrelatedActivityId);
|
||||
Assert.AreEqual(httpRequest.PartitionKeyRangeId, clone.PartitionKeyRangeId);
|
||||
Assert.AreEqual(httpRequest.UseGatewayMode, clone.UseGatewayMode);
|
||||
Assert.AreEqual(httpRequest.ContainerId, clone.ContainerId);
|
||||
Assert.AreEqual(httpRequest.DatabaseId, clone.DatabaseId);
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task HandleChangesAsync(
|
||||
ChangeFeedProcessorContext context,
|
||||
IReadOnlyCollection<AvailabilityStrategyTestObject> changes,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
|
||||
{
|
||||
Assert.Fail("Change Feed Processor took too long");
|
||||
}
|
||||
|
||||
CosmosTraceDiagnostics traceDiagnostic = context.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsObject);
|
||||
List<string> excludeRegionsList = excludeRegionsObject as List<string>;
|
||||
Assert.IsTrue(excludeRegionsList.Contains("Central US"));
|
||||
await Task.Delay(1);
|
||||
}
|
||||
|
||||
private static async Task HandleChangesStepAsync(
|
||||
ChangeFeedProcessorContext context,
|
||||
IReadOnlyCollection<AvailabilityStrategyTestObject> changes,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
|
||||
{
|
||||
Assert.Fail("Change Feed Processor took too long");
|
||||
}
|
||||
|
||||
CosmosTraceDiagnostics traceDiagnostic = context.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("ExcludedRegions", out object excludeRegionsObject);
|
||||
List<string> excludeRegionsList = excludeRegionsObject as List<string>;
|
||||
Assert.IsTrue(excludeRegionsList.Contains("Central US"));
|
||||
Assert.IsTrue(excludeRegionsList.Contains("North Central US"));
|
||||
await Task.Delay(1);
|
||||
}
|
||||
|
||||
private class AvailabilityStrategyTestObject
|
||||
{
|
||||
|
||||
[JsonPropertyName("id")]
|
||||
public string Id { get; set; }
|
||||
|
||||
[JsonPropertyName("pk")]
|
||||
public string Pk { get; set; }
|
||||
|
||||
[JsonPropertyName("other")]
|
||||
public string Other { get; set; }
|
||||
}
|
||||
|
||||
private class CosmosSystemTextJsonSerializer : CosmosSerializer
|
||||
{
|
||||
private readonly JsonObjectSerializer systemTextJsonSerializer;
|
||||
|
||||
public CosmosSystemTextJsonSerializer(JsonSerializerOptions jsonSerializerOptions)
|
||||
{
|
||||
this.systemTextJsonSerializer = new JsonObjectSerializer(jsonSerializerOptions);
|
||||
}
|
||||
|
||||
public override T FromStream<T>(Stream stream)
|
||||
{
|
||||
using (stream)
|
||||
{
|
||||
if (stream.CanSeek
|
||||
&& stream.Length == 0)
|
||||
{
|
||||
return default;
|
||||
}
|
||||
|
||||
if (typeof(Stream).IsAssignableFrom(typeof(T)))
|
||||
{
|
||||
return (T)(object)stream;
|
||||
}
|
||||
|
||||
return (T)this.systemTextJsonSerializer.Deserialize(stream, typeof(T), default);
|
||||
}
|
||||
}
|
||||
|
||||
public override Stream ToStream<T>(T input)
|
||||
{
|
||||
MemoryStream streamPayload = new MemoryStream();
|
||||
this.systemTextJsonSerializer.Serialize(streamPayload, input, input.GetType(), default);
|
||||
streamPayload.Position = 0;
|
||||
return streamPayload;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -371,6 +371,9 @@
|
|||
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
|
||||
</None>
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\..\Microsoft.Azure.Cosmos.Samples\Tools\FaultInjection\src\FaultInjection.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<PropertyGroup>
|
||||
<SignAssembly>true</SignAssembly>
|
||||
|
|
|
@ -847,7 +847,7 @@ namespace Microsoft.Azure.Cosmos.Client.Tests
|
|||
{
|
||||
request.RequestContext.ExcludeRegions = excludeRegions;
|
||||
|
||||
ReadOnlyCollection<Uri> applicableEndpoints = endpointManager.GetApplicableEndpoints(request, isReadRequest);
|
||||
ReadOnlyCollection<Uri> applicableEndpoints = this.cache.GetApplicableEndpoints(request, isReadRequest);
|
||||
|
||||
Uri endpoint = endpointManager.ResolveServiceEndpoint(request);
|
||||
ReadOnlyCollection<Uri> applicableRegions = this.GetApplicableRegions(isReadRequest, useMultipleWriteLocations, usesPreferredLocations, excludeRegions);
|
||||
|
|
Загрузка…
Ссылка в новой задаче