Query: Adds ODE continuation token support for non-ODE pipelines (#4009)
* Added code to throw exception if ODE continuation token goes into non ODE pipeline * Removed count variable * Updated test name * Removed ODE continuation token logic from caller class * Simplified code * Fixed comments * Updated continuation token cast * Removed const string for continuation token * Added Ignore flag for test * Added baseline test * Updated baseline test
This commit is contained in:
Родитель
8c4f99f35e
Коммит
c46614b931
|
@ -752,7 +752,17 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext
|
||||||
ContainerQueryProperties containerQueryProperties,
|
ContainerQueryProperties containerQueryProperties,
|
||||||
ITrace trace)
|
ITrace trace)
|
||||||
{
|
{
|
||||||
if (!inputParameters.EnableOptimisticDirectExecution) return null;
|
if (!inputParameters.EnableOptimisticDirectExecution)
|
||||||
|
{
|
||||||
|
if (inputParameters.InitialUserContinuationToken != null
|
||||||
|
&& OptimisticDirectExecutionContinuationToken.IsOptimisticDirectExecutionContinuationToken(inputParameters.InitialUserContinuationToken))
|
||||||
|
{
|
||||||
|
throw new MalformedContinuationTokenException($"The continuation token supplied requires the Optimistic Direct Execution flag to be enabled in QueryRequestOptions for the query execution to resume. " +
|
||||||
|
$"{inputParameters.InitialUserContinuationToken}");
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
Debug.Assert(containerQueryProperties.ResourceId != null, "CosmosQueryExecutionContextFactory Assert!", "Container ResourceId cannot be null!");
|
Debug.Assert(containerQueryProperties.ResourceId != null, "CosmosQueryExecutionContextFactory Assert!", "Container ResourceId cannot be null!");
|
||||||
|
|
||||||
|
|
|
@ -4,9 +4,7 @@
|
||||||
|
|
||||||
namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.OptimisticDirectExecutionQuery
|
namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.OptimisticDirectExecutionQuery
|
||||||
{
|
{
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using Microsoft.Azure.Cosmos.ChangeFeed;
|
|
||||||
using Microsoft.Azure.Cosmos.CosmosElements;
|
using Microsoft.Azure.Cosmos.CosmosElements;
|
||||||
using Microsoft.Azure.Cosmos.Query.Core.Exceptions;
|
using Microsoft.Azure.Cosmos.Query.Core.Exceptions;
|
||||||
using Microsoft.Azure.Cosmos.Query.Core.Monads;
|
using Microsoft.Azure.Cosmos.Query.Core.Monads;
|
||||||
|
@ -30,6 +28,12 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.OptimisticDirectExecutionQu
|
||||||
|
|
||||||
public Range<string> Range => this.Token.Range;
|
public Range<string> Range => this.Token.Range;
|
||||||
|
|
||||||
|
public static bool IsOptimisticDirectExecutionContinuationToken(CosmosElement continuationToken)
|
||||||
|
{
|
||||||
|
CosmosObject cosmosObjectContinuationToken = continuationToken as CosmosObject;
|
||||||
|
return !(cosmosObjectContinuationToken == null || !cosmosObjectContinuationToken.ContainsKey(OptimisticDirectExecutionToken));
|
||||||
|
}
|
||||||
|
|
||||||
public static CosmosElement ToCosmosElement(OptimisticDirectExecutionContinuationToken continuationToken)
|
public static CosmosElement ToCosmosElement(OptimisticDirectExecutionContinuationToken continuationToken)
|
||||||
{
|
{
|
||||||
CosmosElement inner = ParallelContinuationToken.ToCosmosElement(continuationToken.Token);
|
CosmosElement inner = ParallelContinuationToken.ToCosmosElement(continuationToken.Token);
|
||||||
|
@ -42,14 +46,14 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.OptimisticDirectExecutionQu
|
||||||
|
|
||||||
public static TryCatch<OptimisticDirectExecutionContinuationToken> TryCreateFromCosmosElement(CosmosElement cosmosElement)
|
public static TryCatch<OptimisticDirectExecutionContinuationToken> TryCreateFromCosmosElement(CosmosElement cosmosElement)
|
||||||
{
|
{
|
||||||
CosmosObject cosmosObjectContinuationToken = cosmosElement as CosmosObject;
|
if (!IsOptimisticDirectExecutionContinuationToken(cosmosElement))
|
||||||
if (cosmosObjectContinuationToken == null || !cosmosObjectContinuationToken.ContainsKey(OptimisticDirectExecutionToken))
|
|
||||||
{
|
{
|
||||||
return TryCatch<OptimisticDirectExecutionContinuationToken>.FromException(
|
return TryCatch<OptimisticDirectExecutionContinuationToken>.FromException(
|
||||||
new MalformedContinuationTokenException(
|
new MalformedContinuationTokenException(
|
||||||
message: $"Malformed Continuation Token: Expected OptimisticDirectExecutionToken\r\n"));
|
message: $"Malformed Continuation Token: Expected OptimisticDirectExecutionToken\r\n"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
CosmosObject cosmosObjectContinuationToken = (CosmosObject)cosmosElement;
|
||||||
TryCatch<ParallelContinuationToken> inner = ParallelContinuationToken.TryCreateFromCosmosElement(cosmosObjectContinuationToken[OptimisticDirectExecutionToken]);
|
TryCatch<ParallelContinuationToken> inner = ParallelContinuationToken.TryCreateFromCosmosElement(cosmosObjectContinuationToken[OptimisticDirectExecutionToken]);
|
||||||
|
|
||||||
return inner.Succeeded ?
|
return inner.Succeeded ?
|
||||||
|
|
|
@ -15,7 +15,6 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
||||||
using Cosmos.Scripts;
|
using Cosmos.Scripts;
|
||||||
using Microsoft.Azure.Cosmos.Fluent;
|
using Microsoft.Azure.Cosmos.Fluent;
|
||||||
using Microsoft.Azure.Cosmos.Linq;
|
using Microsoft.Azure.Cosmos.Linq;
|
||||||
using Microsoft.Azure.Cosmos.Query.Core;
|
|
||||||
using Microsoft.Azure.Cosmos.Tracing;
|
using Microsoft.Azure.Cosmos.Tracing;
|
||||||
using Microsoft.Azure.Documents.Collections;
|
using Microsoft.Azure.Documents.Collections;
|
||||||
using Microsoft.VisualStudio.TestTools.UnitTesting;
|
using Microsoft.VisualStudio.TestTools.UnitTesting;
|
||||||
|
@ -786,6 +785,58 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//TODO: Remove Ignore flag once emulator is updated to 0415
|
||||||
|
[Ignore]
|
||||||
|
[TestMethod]
|
||||||
|
public async Task TesOdeTokenCompatibilityWithNonOdePipeline()
|
||||||
|
{
|
||||||
|
string query = "select top 200 * from c";
|
||||||
|
CosmosClient client = DirectCosmosClient;
|
||||||
|
Container container = client.GetContainer(DatabaseId, ContainerId);
|
||||||
|
|
||||||
|
// Create items
|
||||||
|
for (int i = 0; i < 500; i++)
|
||||||
|
{
|
||||||
|
await container.CreateItemAsync<ToDoActivity>(ToDoActivity.CreateRandomToDoActivity());
|
||||||
|
}
|
||||||
|
|
||||||
|
QueryRequestOptions queryRequestOptions = new QueryRequestOptions
|
||||||
|
{
|
||||||
|
MaxItemCount = 50,
|
||||||
|
EnableOptimisticDirectExecution = true
|
||||||
|
};
|
||||||
|
|
||||||
|
FeedIteratorInternal feedIterator =
|
||||||
|
(FeedIteratorInternal)container.GetItemQueryStreamIterator(
|
||||||
|
query,
|
||||||
|
null,
|
||||||
|
queryRequestOptions);
|
||||||
|
|
||||||
|
ResponseMessage responseMessage = await feedIterator.ReadNextAsync(CancellationToken.None);
|
||||||
|
string continuationToken = responseMessage.ContinuationToken;
|
||||||
|
|
||||||
|
QueryRequestOptions newQueryRequestOptions = new QueryRequestOptions
|
||||||
|
{
|
||||||
|
MaxItemCount = 50,
|
||||||
|
EnableOptimisticDirectExecution = false
|
||||||
|
};
|
||||||
|
|
||||||
|
// use Continuation Token to create new iterator and use same trace
|
||||||
|
FeedIterator feedIteratorNew =
|
||||||
|
container.GetItemQueryStreamIterator(
|
||||||
|
query,
|
||||||
|
continuationToken,
|
||||||
|
newQueryRequestOptions);
|
||||||
|
|
||||||
|
while (feedIteratorNew.HasMoreResults)
|
||||||
|
{
|
||||||
|
responseMessage = await feedIteratorNew.ReadNextAsync(CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
string expectedErrorMessage = "The continuation token supplied requires the Optimistic Direct Execution flag to be enabled in QueryRequestOptions for the query execution to resume. ";
|
||||||
|
Assert.IsTrue(responseMessage.CosmosException.ToString().Contains(expectedErrorMessage));
|
||||||
|
}
|
||||||
|
|
||||||
private class CustomHandler : RequestHandler
|
private class CustomHandler : RequestHandler
|
||||||
{
|
{
|
||||||
string correlatedActivityId;
|
string correlatedActivityId;
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
using Microsoft.Azure.Cosmos.Query.Core.Pipeline;
|
using Microsoft.Azure.Cosmos.Query.Core.Pipeline;
|
||||||
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy;
|
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy;
|
||||||
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.Parallel;
|
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.Parallel;
|
||||||
|
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.OptimisticDirectExecutionQuery;
|
||||||
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination;
|
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination;
|
||||||
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
|
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
|
||||||
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
|
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
|
||||||
|
@ -190,6 +191,49 @@
|
||||||
Assert.AreEqual(100, documentCountInSinglePartition);
|
Assert.AreEqual(100, documentCountInSinglePartition);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[TestMethod]
|
||||||
|
public async Task TestOdeTokenWithSpecializedPipeline()
|
||||||
|
{
|
||||||
|
int numItems = 100;
|
||||||
|
ParallelContinuationToken parallelContinuationToken = new ParallelContinuationToken(
|
||||||
|
token: Guid.NewGuid().ToString(),
|
||||||
|
range: new Documents.Routing.Range<string>("A", "B", true, false));
|
||||||
|
|
||||||
|
OptimisticDirectExecutionContinuationToken optimisticDirectExecutionContinuationToken = new OptimisticDirectExecutionContinuationToken(parallelContinuationToken);
|
||||||
|
CosmosElement cosmosElementContinuationToken = OptimisticDirectExecutionContinuationToken.ToCosmosElement(optimisticDirectExecutionContinuationToken);
|
||||||
|
|
||||||
|
OptimisticDirectExecutionTestInput input = CreateInput(
|
||||||
|
description: @"Single Partition Key and Value Field",
|
||||||
|
query: "SELECT VALUE COUNT(1) FROM c",
|
||||||
|
expectedOptimisticDirectExecution: false,
|
||||||
|
partitionKeyPath: @"/pk",
|
||||||
|
partitionKeyValue: "a",
|
||||||
|
continuationToken: cosmosElementContinuationToken);
|
||||||
|
|
||||||
|
DocumentContainer documentContainer = await CreateDocumentContainerAsync(numItems, multiPartition: false);
|
||||||
|
QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(enableOptimisticDirectExecution: input.ExpectedOptimisticDirectExecution);
|
||||||
|
(CosmosQueryExecutionContextFactory.InputParameters inputParameters, CosmosQueryContextCore cosmosQueryContextCore) = CreateInputParamsAndQueryContext(input, queryRequestOptions);
|
||||||
|
IQueryPipelineStage queryPipelineStage = CosmosQueryExecutionContextFactory.Create(
|
||||||
|
documentContainer,
|
||||||
|
cosmosQueryContextCore,
|
||||||
|
inputParameters,
|
||||||
|
NoOpTrace.Singleton);
|
||||||
|
|
||||||
|
string expectedErrorMessage = "The continuation token supplied requires the Optimistic Direct Execution flag to be enabled in QueryRequestOptions for the query execution to resume. ";
|
||||||
|
|
||||||
|
while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton))
|
||||||
|
{
|
||||||
|
if (queryPipelineStage.Current.Failed)
|
||||||
|
{
|
||||||
|
Assert.IsTrue(queryPipelineStage.Current.InnerMostException.ToString().Contains(expectedErrorMessage));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.IsFalse(true);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
[TestMethod]
|
[TestMethod]
|
||||||
public async Task TestQueriesWhichNeverRequireDistribution()
|
public async Task TestQueriesWhichNeverRequireDistribution()
|
||||||
{
|
{
|
||||||
|
|
Загрузка…
Ссылка в новой задаче