Merge branch 'master' into users/aavasthy/thinclient_serializer
This commit is contained in:
Коммит
c6d77a0466
|
@ -12,6 +12,7 @@ namespace Microsoft.Azure.Cosmos
|
|||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Cosmos.Core.Trace;
|
||||
using Microsoft.Azure.Cosmos.Diagnostics;
|
||||
using Microsoft.Azure.Cosmos.Tracing;
|
||||
using Microsoft.Azure.Documents;
|
||||
|
||||
/// <summary>
|
||||
|
@ -22,10 +23,8 @@ namespace Microsoft.Azure.Cosmos
|
|||
/// </summary>
|
||||
internal class CrossRegionParallelHedgingAvailabilityStrategy : AvailabilityStrategy
|
||||
{
|
||||
private const string HedgeRegions = "Hedge Regions";
|
||||
private const string HedgeContext = "Hedge Context";
|
||||
private const string HedgeContextOriginalRequest = "Original Request";
|
||||
private const string HedgeContextHedgedRequest = "Hedged Request";
|
||||
private const string ResponseRegion = "Response Region";
|
||||
|
||||
/// <summary>
|
||||
/// Latency threshold which activates the first region hedging
|
||||
|
@ -106,6 +105,8 @@ namespace Microsoft.Azure.Cosmos
|
|||
{
|
||||
return await sender(request, cancellationToken);
|
||||
}
|
||||
|
||||
ITrace trace = request.Trace;
|
||||
|
||||
using (CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
|
||||
{
|
||||
|
@ -120,9 +121,8 @@ namespace Microsoft.Azure.Cosmos
|
|||
|
||||
List<Task> requestTasks = new List<Task>(hedgeRegions.Count + 1);
|
||||
|
||||
Task<(bool, ResponseMessage)> primaryRequest = null;
|
||||
|
||||
ResponseMessage responseMessage = null;
|
||||
Task<HedgingResponse> primaryRequest = null;
|
||||
HedgingResponse hedgeResponse = null;
|
||||
|
||||
//Send out hedged requests
|
||||
for (int requestNumber = 0; requestNumber < hedgeRegions.Count; requestNumber++)
|
||||
|
@ -139,6 +139,7 @@ namespace Microsoft.Azure.Cosmos
|
|||
primaryRequest = this.RequestSenderAndResultCheckAsync(
|
||||
sender,
|
||||
request,
|
||||
hedgeRegions.ElementAt(requestNumber),
|
||||
cancellationToken,
|
||||
cancellationTokenSource);
|
||||
|
||||
|
@ -146,12 +147,13 @@ namespace Microsoft.Azure.Cosmos
|
|||
}
|
||||
else
|
||||
{
|
||||
Task<(bool, ResponseMessage)> requestTask = this.CloneAndSendAsync(
|
||||
Task<HedgingResponse> requestTask = this.CloneAndSendAsync(
|
||||
sender: sender,
|
||||
request: request,
|
||||
clonedBody: clonedBody,
|
||||
hedgeRegions: hedgeRegions,
|
||||
requestNumber: requestNumber,
|
||||
trace: trace,
|
||||
cancellationToken: cancellationToken,
|
||||
cancellationTokenSource: cancellationTokenSource);
|
||||
|
||||
|
@ -176,19 +178,18 @@ namespace Microsoft.Azure.Cosmos
|
|||
AggregateException innerExceptions = completedTask.Exception.Flatten();
|
||||
}
|
||||
|
||||
(bool isNonTransient, responseMessage) = await (Task<(bool, ResponseMessage)>)completedTask;
|
||||
if (isNonTransient)
|
||||
hedgeResponse = await (Task<HedgingResponse>)completedTask;
|
||||
if (hedgeResponse.IsNonTransient)
|
||||
{
|
||||
cancellationTokenSource.Cancel();
|
||||
((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum(
|
||||
HedgeRegions,
|
||||
HedgeRegionsToString(responseMessage.Diagnostics.GetContactedRegions()));
|
||||
((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum(
|
||||
//Take is not inclusive, so we need to add 1 to the request number which starts at 0
|
||||
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
|
||||
HedgeContext,
|
||||
object.ReferenceEquals(primaryRequest, completedTask)
|
||||
? HedgeContextOriginalRequest
|
||||
: HedgeContextHedgedRequest);
|
||||
return responseMessage;
|
||||
hedgeRegions.Take(requestNumber + 1));
|
||||
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
|
||||
ResponseRegion,
|
||||
hedgeResponse.ResponseRegion);
|
||||
return hedgeResponse.ResponseMessage;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -206,19 +207,17 @@ namespace Microsoft.Azure.Cosmos
|
|||
lastException = innerExceptions.InnerExceptions.FirstOrDefault();
|
||||
}
|
||||
|
||||
(bool isNonTransient, responseMessage) = await (Task<(bool, ResponseMessage)>)completedTask;
|
||||
if (isNonTransient || requestTasks.Count == 0)
|
||||
hedgeResponse = await (Task<HedgingResponse>)completedTask;
|
||||
if (hedgeResponse.IsNonTransient || requestTasks.Count == 0)
|
||||
{
|
||||
cancellationTokenSource.Cancel();
|
||||
((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum(
|
||||
HedgeRegions,
|
||||
HedgeRegionsToString(responseMessage.Diagnostics.GetContactedRegions()));
|
||||
((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum(
|
||||
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
|
||||
HedgeContext,
|
||||
object.ReferenceEquals(primaryRequest, completedTask)
|
||||
? HedgeContextOriginalRequest
|
||||
: HedgeContextHedgedRequest);
|
||||
return responseMessage;
|
||||
hedgeRegions);
|
||||
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
|
||||
ResponseRegion,
|
||||
hedgeResponse.ResponseRegion);
|
||||
return hedgeResponse.ResponseMessage;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -227,41 +226,48 @@ namespace Microsoft.Azure.Cosmos
|
|||
throw lastException;
|
||||
}
|
||||
|
||||
Debug.Assert(responseMessage != null);
|
||||
return responseMessage;
|
||||
Debug.Assert(hedgeResponse != null);
|
||||
return hedgeResponse.ResponseMessage;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<(bool, ResponseMessage)> CloneAndSendAsync(
|
||||
private async Task<HedgingResponse> CloneAndSendAsync(
|
||||
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
|
||||
RequestMessage request,
|
||||
CloneableStream clonedBody,
|
||||
IReadOnlyCollection<string> hedgeRegions,
|
||||
int requestNumber,
|
||||
ITrace trace,
|
||||
CancellationToken cancellationToken,
|
||||
CancellationTokenSource cancellationTokenSource)
|
||||
{
|
||||
RequestMessage clonedRequest;
|
||||
using (clonedRequest = request.Clone(request.Trace.Parent, clonedBody))
|
||||
|
||||
using (clonedRequest = request.Clone(
|
||||
trace,
|
||||
clonedBody))
|
||||
{
|
||||
clonedRequest.RequestOptions ??= new RequestOptions();
|
||||
|
||||
List<string> excludeRegions = new List<string>(hedgeRegions);
|
||||
string region = excludeRegions[requestNumber];
|
||||
excludeRegions.RemoveAt(requestNumber);
|
||||
clonedRequest.RequestOptions.ExcludeRegions = excludeRegions;
|
||||
|
||||
return await this.RequestSenderAndResultCheckAsync(
|
||||
sender,
|
||||
clonedRequest,
|
||||
region,
|
||||
cancellationToken,
|
||||
cancellationTokenSource);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<(bool, ResponseMessage)> RequestSenderAndResultCheckAsync(
|
||||
private async Task<HedgingResponse> RequestSenderAndResultCheckAsync(
|
||||
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
|
||||
RequestMessage request,
|
||||
string hedgedRegion,
|
||||
CancellationToken cancellationToken,
|
||||
CancellationTokenSource cancellationTokenSource)
|
||||
{
|
||||
|
@ -274,14 +280,15 @@ namespace Microsoft.Azure.Cosmos
|
|||
{
|
||||
cancellationTokenSource.Cancel();
|
||||
}
|
||||
return (true, response);
|
||||
|
||||
return new HedgingResponse(true, response, hedgedRegion);
|
||||
}
|
||||
|
||||
return (false, response);
|
||||
return new HedgingResponse(false, response, hedgedRegion);
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationTokenSource.IsCancellationRequested)
|
||||
{
|
||||
return (false, null);
|
||||
return new HedgingResponse(false, null, hedgedRegion);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
@ -315,9 +322,18 @@ namespace Microsoft.Azure.Cosmos
|
|||
return statusCode == (int)HttpStatusCode.NotFound && subStatusCode == (int)SubStatusCodes.Unknown;
|
||||
}
|
||||
|
||||
private static string HedgeRegionsToString(IReadOnlyList<(string, Uri)> hedgeRegions)
|
||||
private sealed class HedgingResponse
|
||||
{
|
||||
return string.Join(",", hedgeRegions);
|
||||
public readonly bool IsNonTransient;
|
||||
public readonly ResponseMessage ResponseMessage;
|
||||
public readonly string ResponseRegion;
|
||||
|
||||
public HedgingResponse(bool isNonTransient, ResponseMessage responseMessage, string responseRegion)
|
||||
{
|
||||
this.IsNonTransient = isNonTransient;
|
||||
this.ResponseMessage = responseMessage;
|
||||
this.ResponseRegion = responseRegion;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
using System.Collections.Generic;
|
||||
using System.Data;
|
||||
using System.IO;
|
||||
using System.Net.Http;
|
||||
using System.Linq;
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Serialization;
|
||||
using System.Threading;
|
||||
|
@ -14,7 +14,6 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
using Microsoft.Azure.Cosmos;
|
||||
using Microsoft.Azure.Cosmos.Diagnostics;
|
||||
using Microsoft.Azure.Cosmos.FaultInjection;
|
||||
using Microsoft.Azure.Documents;
|
||||
using Microsoft.VisualStudio.TestTools.UnitTesting;
|
||||
using Database = Database;
|
||||
using PartitionKey = PartitionKey;
|
||||
|
@ -22,6 +21,9 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
[TestClass]
|
||||
public class CosmosAvailabilityStrategyTests
|
||||
{
|
||||
private const string centralUS = "Central US";
|
||||
private const string northCentralUS = "North Central US";
|
||||
private const string eastUs = "East US";
|
||||
|
||||
private CosmosClient client;
|
||||
private Database database;
|
||||
|
@ -184,7 +186,21 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
.WithDuration(TimeSpan.FromMinutes(90))
|
||||
.Build();
|
||||
|
||||
List<FaultInjectionRule> rules = new List<FaultInjectionRule>() { responseDelay };
|
||||
FaultInjectionRule responseDelay2 = new FaultInjectionRuleBuilder(
|
||||
id: "responseDely",
|
||||
condition:
|
||||
new FaultInjectionConditionBuilder()
|
||||
.WithRegion("North Central US")
|
||||
.WithOperationType(FaultInjectionOperationType.ReadItem)
|
||||
.Build(),
|
||||
result:
|
||||
FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ResponseDelay)
|
||||
.WithDelay(TimeSpan.FromMilliseconds(3000))
|
||||
.Build())
|
||||
.WithDuration(TimeSpan.FromMinutes(90))
|
||||
.Build();
|
||||
|
||||
List<FaultInjectionRule> rules = new List<FaultInjectionRule>() { responseDelay, responseDelay2 };
|
||||
FaultInjector faultInjector = new FaultInjector(rules);
|
||||
|
||||
responseDelay.Disable();
|
||||
|
@ -194,7 +210,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
ConnectionMode = ConnectionMode.Direct,
|
||||
ApplicationPreferredRegions = new List<string>() { "Central US", "North Central US" },
|
||||
AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy(
|
||||
threshold: TimeSpan.FromMilliseconds(1500),
|
||||
threshold: TimeSpan.FromMilliseconds(300),
|
||||
thresholdStep: TimeSpan.FromMilliseconds(50)),
|
||||
Serializer = this.cosmosSystemTextJsonSerializer
|
||||
};
|
||||
|
@ -211,10 +227,18 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
|
||||
CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Response Region", out object responseRegion);
|
||||
Assert.IsNotNull(responseRegion);
|
||||
Assert.AreEqual(centralUS, (string)responseRegion);
|
||||
|
||||
//Should send out hedge request but original should be returned
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Original Request", (string)hedgeContext);
|
||||
|
||||
IReadOnlyCollection<string> hedgeContextList;
|
||||
hedgeContextList = hedgeContext as IReadOnlyCollection<string>;
|
||||
Assert.AreEqual(2, hedgeContextList.Count);
|
||||
Assert.IsTrue(hedgeContextList.Contains(centralUS));
|
||||
Assert.IsTrue(hedgeContextList.Contains(northCentralUS));
|
||||
faultInjectionClient.Dispose();
|
||||
}
|
||||
|
||||
|
@ -270,10 +294,9 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
|
||||
CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Response Region", out object hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
faultInjectionClient.Dispose();
|
||||
Assert.AreEqual(northCentralUS, (string)hedgeContext);
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
|
@ -331,6 +354,8 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
|
||||
Assert.IsFalse(traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out _));
|
||||
|
||||
faultInjectionClient.Dispose();
|
||||
}
|
||||
|
||||
|
@ -430,9 +455,9 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
Assert.IsTrue(rule.GetHitCount() > 0);
|
||||
traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
Assert.AreEqual(northCentralUS, (string)hedgeContext);
|
||||
|
||||
break;
|
||||
|
||||
|
@ -457,9 +482,9 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
Assert.IsTrue(rule.GetHitCount() > 0);
|
||||
traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
Assert.AreEqual(northCentralUS, (string)hedgeContext);
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -478,9 +503,9 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
Assert.IsTrue(rule.GetHitCount() > 0);
|
||||
traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
Assert.AreEqual(northCentralUS, (string)hedgeContext);
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -500,9 +525,9 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
Assert.IsTrue(rule.GetHitCount() > 0);
|
||||
traceDiagnostic = readManyResponse.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
Assert.AreEqual(northCentralUS, (string)hedgeContext);
|
||||
|
||||
break;
|
||||
|
||||
|
@ -608,9 +633,9 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
|
||||
traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
Assert.AreEqual(eastUs, (string)hedgeContext);
|
||||
|
||||
break;
|
||||
|
||||
|
@ -635,9 +660,9 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
|
||||
traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
Assert.AreEqual(eastUs, (string)hedgeContext);
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -656,9 +681,9 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
|
||||
traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
Assert.AreEqual(eastUs, (string)hedgeContext);
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -678,9 +703,9 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
|
||||
traceDiagnostic = readManyResponse.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
Assert.AreEqual(eastUs, (string)hedgeContext);
|
||||
|
||||
break;
|
||||
|
||||
|
@ -720,50 +745,6 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
faultInjectionClient.Dispose();
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
[TestCategory("MultiRegion")]
|
||||
public async Task RequestMessageCloneTests()
|
||||
{
|
||||
RequestMessage httpRequest = new RequestMessage(
|
||||
HttpMethod.Get,
|
||||
new Uri("/dbs/testdb/colls/testcontainer/docs/testId", UriKind.Relative));
|
||||
|
||||
string key = Guid.NewGuid().ToString();
|
||||
Dictionary<string, object> properties = new Dictionary<string, object>()
|
||||
{
|
||||
{ key, Guid.NewGuid() }
|
||||
};
|
||||
|
||||
RequestOptions requestOptions = new RequestOptions()
|
||||
{
|
||||
Properties = properties
|
||||
};
|
||||
|
||||
httpRequest.RequestOptions = requestOptions;
|
||||
httpRequest.ResourceType = ResourceType.Document;
|
||||
httpRequest.OperationType = OperationType.Read;
|
||||
httpRequest.Headers.CorrelatedActivityId = Guid.NewGuid().ToString();
|
||||
httpRequest.PartitionKeyRangeId = new PartitionKeyRangeIdentity("0", "1");
|
||||
httpRequest.UseGatewayMode = true;
|
||||
httpRequest.ContainerId = "testcontainer";
|
||||
httpRequest.DatabaseId = "testdb";
|
||||
httpRequest.Content = Stream.Null;
|
||||
|
||||
using (CloneableStream clonedBody = await StreamExtension.AsClonableStreamAsync(httpRequest.Content))
|
||||
{
|
||||
RequestMessage clone = httpRequest.Clone(httpRequest.Trace, clonedBody);
|
||||
|
||||
Assert.AreEqual(httpRequest.RequestOptions.Properties, clone.RequestOptions.Properties);
|
||||
Assert.AreEqual(httpRequest.ResourceType, clone.ResourceType);
|
||||
Assert.AreEqual(httpRequest.OperationType, clone.OperationType);
|
||||
Assert.AreEqual(httpRequest.Headers.CorrelatedActivityId, clone.Headers.CorrelatedActivityId);
|
||||
Assert.AreEqual(httpRequest.PartitionKeyRangeId, clone.PartitionKeyRangeId);
|
||||
Assert.AreEqual(httpRequest.UseGatewayMode, clone.UseGatewayMode);
|
||||
Assert.AreEqual(httpRequest.ContainerId, clone.ContainerId);
|
||||
Assert.AreEqual(httpRequest.DatabaseId, clone.DatabaseId);
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task HandleChangesAsync(
|
||||
ChangeFeedProcessorContext context,
|
||||
IReadOnlyCollection<AvailabilityStrategyTestObject> changes,
|
||||
|
@ -776,10 +757,9 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
|
||||
CosmosTraceDiagnostics traceDiagnostic = context.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Response Region", out object hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
|
||||
Assert.AreNotEqual(centralUS, (string)hedgeContext);
|
||||
await Task.Delay(1);
|
||||
}
|
||||
|
||||
|
@ -795,10 +775,10 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
|||
|
||||
CosmosTraceDiagnostics traceDiagnostic = context.Diagnostics as CosmosTraceDiagnostics;
|
||||
Assert.IsNotNull(traceDiagnostic);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext);
|
||||
traceDiagnostic.Value.Data.TryGetValue("Response Region", out object hedgeContext);
|
||||
Assert.IsNotNull(hedgeContext);
|
||||
Assert.AreEqual("Hedged Request", (string)hedgeContext);
|
||||
|
||||
Assert.AreNotEqual(centralUS, (string)hedgeContext);
|
||||
Assert.AreNotEqual(northCentralUS, (string)hedgeContext);
|
||||
await Task.Delay(1);
|
||||
}
|
||||
|
||||
|
|
|
@ -5,22 +5,26 @@
|
|||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Cosmos;
|
||||
using Microsoft.Azure.Cosmos.Diagnostics;
|
||||
using Microsoft.Azure.Cosmos.FaultInjection;
|
||||
using Microsoft.Azure.Cosmos.Tracing;
|
||||
using Microsoft.VisualStudio.TestTools.UnitTesting;
|
||||
|
||||
[TestClass]
|
||||
public class CosmosMultiRegionDiagnosticsTests
|
||||
{
|
||||
CosmosClient client;
|
||||
Database database;
|
||||
Container container;
|
||||
|
||||
string connectionString;
|
||||
string dbName;
|
||||
string containerName;
|
||||
|
||||
[TestInitialize]
|
||||
public async Task TestInitialize()
|
||||
{
|
||||
string connectionString = ConfigurationManager.GetEnvironmentVariable<string>("COSMOSDB_MULTI_REGION", null);
|
||||
this.client = new CosmosClient(connectionString);
|
||||
this.connectionString = ConfigurationManager.GetEnvironmentVariable<string>("COSMOSDB_MULTI_REGION", null);
|
||||
this.client = new CosmosClient(this.connectionString);
|
||||
|
||||
this.dbName = Guid.NewGuid().ToString();
|
||||
this.database = await this.client.CreateDatabaseIfNotExistsAsync(this.dbName);
|
||||
|
@ -61,5 +65,80 @@
|
|||
Assert.IsTrue(excludeRegionsList.Contains("North Central US"));
|
||||
Assert.IsTrue(excludeRegionsList.Contains("East US"));
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
[TestCategory("MultiRegion")]
|
||||
public async Task HedgeNestingDiagnosticsTest()
|
||||
{
|
||||
//Wait for global replication
|
||||
await Task.Delay(60 * 1000);
|
||||
|
||||
FaultInjectionRule responseDelay = new FaultInjectionRuleBuilder(
|
||||
id: "responseDely",
|
||||
condition:
|
||||
new FaultInjectionConditionBuilder()
|
||||
.WithRegion("Central US")
|
||||
.WithOperationType(FaultInjectionOperationType.ReadItem)
|
||||
.Build(),
|
||||
result:
|
||||
FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ResponseDelay)
|
||||
.WithDelay(TimeSpan.FromMilliseconds(4000))
|
||||
.Build())
|
||||
.WithDuration(TimeSpan.FromMinutes(90))
|
||||
.Build();
|
||||
|
||||
List<FaultInjectionRule> rules = new List<FaultInjectionRule>() { responseDelay };
|
||||
FaultInjector faultInjector = new FaultInjector(rules);
|
||||
|
||||
responseDelay.Disable();
|
||||
|
||||
CosmosClientOptions clientOptions = new CosmosClientOptions()
|
||||
{
|
||||
ConnectionMode = ConnectionMode.Direct,
|
||||
ApplicationPreferredRegions = new List<string>() { "Central US", "North Central US" },
|
||||
};
|
||||
|
||||
using (CosmosClient faultInjectionClient = new CosmosClient(
|
||||
connectionString: this.connectionString,
|
||||
clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions)))
|
||||
{
|
||||
Database database = faultInjectionClient.GetDatabase(this.dbName);
|
||||
Container container = database.GetContainer(this.containerName);
|
||||
|
||||
responseDelay.Enable();
|
||||
|
||||
ItemRequestOptions requestOptions = new ItemRequestOptions
|
||||
{
|
||||
AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy(
|
||||
threshold: TimeSpan.FromMilliseconds(100),
|
||||
thresholdStep: TimeSpan.FromMilliseconds(50))
|
||||
};
|
||||
|
||||
//Request should be hedged to North Central US
|
||||
ItemResponse<ToDoActivity> itemResponse = await container.ReadItemAsync<ToDoActivity>(
|
||||
"1", new PartitionKey("1"),
|
||||
requestOptions);
|
||||
|
||||
CosmosTraceDiagnostics traceDiagnostic = itemResponse.Diagnostics as CosmosTraceDiagnostics;
|
||||
|
||||
//Walthrough the diagnostics to ensure at Request Invoker Handler Level
|
||||
//has two Diagnostics Handler Children
|
||||
IReadOnlyList<ITrace> traceChildren = traceDiagnostic.Value.Children;
|
||||
int diagnosticsHandlerCount = 0;
|
||||
foreach (ITrace trace in traceChildren)
|
||||
{
|
||||
if (trace.Name == "Microsoft.Azure.Cosmos.Handlers.RequestInvokerHandler")
|
||||
{
|
||||
foreach (ITrace childTrace in trace.Children)
|
||||
{
|
||||
if (childTrace.Name == "Microsoft.Azure.Cosmos.Handlers.DiagnosticsHandler")
|
||||
{
|
||||
diagnosticsHandlerCount++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
namespace Microsoft.Azure.Cosmos.Tests
|
||||
{
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Net.Http;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Cosmos;
|
||||
using Microsoft.Azure.Documents;
|
||||
using Microsoft.VisualStudio.TestTools.UnitTesting;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for <see cref="AvailabilityStrategy"/>
|
||||
/// </summary>
|
||||
[TestClass]
|
||||
public class AvailabilityStrategyUnitTests
|
||||
{
|
||||
[TestMethod]
|
||||
public async Task RequestMessageCloneTests()
|
||||
{
|
||||
RequestMessage httpRequest = new RequestMessage(
|
||||
HttpMethod.Get,
|
||||
new Uri("/dbs/testdb/colls/testcontainer/docs/testId", UriKind.Relative));
|
||||
|
||||
string key = Guid.NewGuid().ToString();
|
||||
Dictionary<string, object> properties = new Dictionary<string, object>()
|
||||
{
|
||||
{ key, Guid.NewGuid() }
|
||||
};
|
||||
|
||||
RequestOptions requestOptions = new RequestOptions()
|
||||
{
|
||||
Properties = properties
|
||||
};
|
||||
|
||||
httpRequest.RequestOptions = requestOptions;
|
||||
httpRequest.ResourceType = ResourceType.Document;
|
||||
httpRequest.OperationType = OperationType.Read;
|
||||
httpRequest.Headers.CorrelatedActivityId = Guid.NewGuid().ToString();
|
||||
httpRequest.PartitionKeyRangeId = new PartitionKeyRangeIdentity("0", "1");
|
||||
httpRequest.UseGatewayMode = true;
|
||||
httpRequest.ContainerId = "testcontainer";
|
||||
httpRequest.DatabaseId = "testdb";
|
||||
httpRequest.Content = Stream.Null;
|
||||
|
||||
using (CloneableStream clonedBody = await StreamExtension.AsClonableStreamAsync(httpRequest.Content))
|
||||
{
|
||||
RequestMessage clone = httpRequest.Clone(httpRequest.Trace, clonedBody);
|
||||
|
||||
Assert.AreEqual(httpRequest.RequestOptions.Properties, clone.RequestOptions.Properties);
|
||||
Assert.AreEqual(httpRequest.ResourceType, clone.ResourceType);
|
||||
Assert.AreEqual(httpRequest.OperationType, clone.OperationType);
|
||||
Assert.AreEqual(httpRequest.Headers.CorrelatedActivityId, clone.Headers.CorrelatedActivityId);
|
||||
Assert.AreEqual(httpRequest.PartitionKeyRangeId, clone.PartitionKeyRangeId);
|
||||
Assert.AreEqual(httpRequest.UseGatewayMode, clone.UseGatewayMode);
|
||||
Assert.AreEqual(httpRequest.ContainerId, clone.ContainerId);
|
||||
Assert.AreEqual(httpRequest.DatabaseId, clone.DatabaseId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos.Tests.Telemetry
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Collections.ObjectModel;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Reflection;
|
||||
|
@ -18,6 +19,7 @@ namespace Microsoft.Azure.Cosmos.Tests.Telemetry
|
|||
using Microsoft.Azure.Cosmos.Tracing;
|
||||
using Microsoft.VisualStudio.TestTools.UnitTesting;
|
||||
using Moq;
|
||||
using static Microsoft.Azure.Cosmos.CrossRegionParallelHedgingAvailabilityStrategy;
|
||||
|
||||
[TestClass]
|
||||
public class OpenTelemetryRecorderTests
|
||||
|
@ -86,7 +88,8 @@ namespace Microsoft.Azure.Cosmos.Tests.Telemetry
|
|||
{ "StoredProcedureExecuteResponse`1",new Mock<StoredProcedureExecuteResponse<object>>().Object },
|
||||
{ "StoredProcedureResponse", new Mock<StoredProcedureResponse>().Object },
|
||||
{ "TriggerResponse", new Mock<TriggerResponse>().Object },
|
||||
{ "UserDefinedFunctionResponse", new Mock<UserDefinedFunctionResponse>().Object }
|
||||
{ "UserDefinedFunctionResponse", new Mock<UserDefinedFunctionResponse>().Object },
|
||||
{ "HedgingResponse", "HedgingResponse" },
|
||||
};
|
||||
|
||||
Assembly asm = OpenTelemetryRecorderTests.GetAssemblyLocally(DllName);
|
||||
|
@ -165,6 +168,13 @@ namespace Microsoft.Azure.Cosmos.Tests.Telemetry
|
|||
{
|
||||
_ = new OpenTelemetryResponse<StoredProcedureExecuteResponse<object>>(storedProcedureExecuteResponse);
|
||||
}
|
||||
else if (instance is string hedgingResponse)
|
||||
{
|
||||
Assert.AreEqual(
|
||||
"HedgingResponse",
|
||||
hedgingResponse,
|
||||
"HedgingResponse is only used internally in the CrossRegionParallelHedgingAvailabilityStrategy and is never returned. No support Needed.");
|
||||
}
|
||||
else
|
||||
{
|
||||
Assert.Fail("Opentelemetry does not support this response type " + className.Name);
|
||||
|
|
Загрузка…
Ссылка в новой задаче