From c46614b9311365f203d489394cf475e212ca202f Mon Sep 17 00:00:00 2001 From: akotalwar <94020786+akotalwar@users.noreply.github.com> Date: Sat, 5 Aug 2023 13:44:45 -0700 Subject: [PATCH] Query: Adds ODE continuation token support for non-ODE pipelines (#4009) * Added code to throw exception if ODE continuation token goes into non ODE pipeline * Removed count variable * Updated test name * Removed ODE continuation token logic from caller class * Simplified code * Fixed comments * Updated continuation token cast * Removed const string for continuation token * Added Ignore flag for test * Added baseline test * Updated baseline test --- .../CosmosQueryExecutionContextFactory.cs | 12 ++++- ...imisticDirectExecutionContinuationToken.cs | 16 +++--- .../CosmosBasicQueryTests.cs | 53 ++++++++++++++++++- ...misticDirectExecutionQueryBaselineTests.cs | 44 +++++++++++++++ 4 files changed, 117 insertions(+), 8 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs index 47aa1c449..43ebf6a2d 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs @@ -752,7 +752,17 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext ContainerQueryProperties containerQueryProperties, ITrace trace) { - if (!inputParameters.EnableOptimisticDirectExecution) return null; + if (!inputParameters.EnableOptimisticDirectExecution) + { + if (inputParameters.InitialUserContinuationToken != null + && OptimisticDirectExecutionContinuationToken.IsOptimisticDirectExecutionContinuationToken(inputParameters.InitialUserContinuationToken)) + { + throw new MalformedContinuationTokenException($"The continuation token supplied requires the Optimistic Direct Execution flag to be enabled in QueryRequestOptions for the query execution to resume. " + + $"{inputParameters.InitialUserContinuationToken}"); + } + + return null; + } Debug.Assert(containerQueryProperties.ResourceId != null, "CosmosQueryExecutionContextFactory Assert!", "Container ResourceId cannot be null!"); diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/OptimisticDirectExecution/OptimisticDirectExecutionContinuationToken.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/OptimisticDirectExecution/OptimisticDirectExecutionContinuationToken.cs index 59f87295f..76b2531f0 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/OptimisticDirectExecution/OptimisticDirectExecutionContinuationToken.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/OptimisticDirectExecution/OptimisticDirectExecutionContinuationToken.cs @@ -4,9 +4,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.OptimisticDirectExecutionQuery { - using System; using System.Collections.Generic; - using Microsoft.Azure.Cosmos.ChangeFeed; using Microsoft.Azure.Cosmos.CosmosElements; using Microsoft.Azure.Cosmos.Query.Core.Exceptions; using Microsoft.Azure.Cosmos.Query.Core.Monads; @@ -30,6 +28,12 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.OptimisticDirectExecutionQu public Range Range => this.Token.Range; + public static bool IsOptimisticDirectExecutionContinuationToken(CosmosElement continuationToken) + { + CosmosObject cosmosObjectContinuationToken = continuationToken as CosmosObject; + return !(cosmosObjectContinuationToken == null || !cosmosObjectContinuationToken.ContainsKey(OptimisticDirectExecutionToken)); + } + public static CosmosElement ToCosmosElement(OptimisticDirectExecutionContinuationToken continuationToken) { CosmosElement inner = ParallelContinuationToken.ToCosmosElement(continuationToken.Token); @@ -42,14 +46,14 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.OptimisticDirectExecutionQu public static TryCatch TryCreateFromCosmosElement(CosmosElement cosmosElement) { - CosmosObject cosmosObjectContinuationToken = cosmosElement as CosmosObject; - if (cosmosObjectContinuationToken == null || !cosmosObjectContinuationToken.ContainsKey(OptimisticDirectExecutionToken)) + if (!IsOptimisticDirectExecutionContinuationToken(cosmosElement)) { return TryCatch.FromException( - new MalformedContinuationTokenException( - message: $"Malformed Continuation Token: Expected OptimisticDirectExecutionToken\r\n")); + new MalformedContinuationTokenException( + message: $"Malformed Continuation Token: Expected OptimisticDirectExecutionToken\r\n")); } + CosmosObject cosmosObjectContinuationToken = (CosmosObject)cosmosElement; TryCatch inner = ParallelContinuationToken.TryCreateFromCosmosElement(cosmosObjectContinuationToken[OptimisticDirectExecutionToken]); return inner.Succeeded ? diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosBasicQueryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosBasicQueryTests.cs index 25ac9a705..bfd636610 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosBasicQueryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosBasicQueryTests.cs @@ -15,7 +15,6 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests using Cosmos.Scripts; using Microsoft.Azure.Cosmos.Fluent; using Microsoft.Azure.Cosmos.Linq; - using Microsoft.Azure.Cosmos.Query.Core; using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Documents.Collections; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -786,6 +785,58 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests } + //TODO: Remove Ignore flag once emulator is updated to 0415 + [Ignore] + [TestMethod] + public async Task TesOdeTokenCompatibilityWithNonOdePipeline() + { + string query = "select top 200 * from c"; + CosmosClient client = DirectCosmosClient; + Container container = client.GetContainer(DatabaseId, ContainerId); + + // Create items + for (int i = 0; i < 500; i++) + { + await container.CreateItemAsync(ToDoActivity.CreateRandomToDoActivity()); + } + + QueryRequestOptions queryRequestOptions = new QueryRequestOptions + { + MaxItemCount = 50, + EnableOptimisticDirectExecution = true + }; + + FeedIteratorInternal feedIterator = + (FeedIteratorInternal)container.GetItemQueryStreamIterator( + query, + null, + queryRequestOptions); + + ResponseMessage responseMessage = await feedIterator.ReadNextAsync(CancellationToken.None); + string continuationToken = responseMessage.ContinuationToken; + + QueryRequestOptions newQueryRequestOptions = new QueryRequestOptions + { + MaxItemCount = 50, + EnableOptimisticDirectExecution = false + }; + + // use Continuation Token to create new iterator and use same trace + FeedIterator feedIteratorNew = + container.GetItemQueryStreamIterator( + query, + continuationToken, + newQueryRequestOptions); + + while (feedIteratorNew.HasMoreResults) + { + responseMessage = await feedIteratorNew.ReadNextAsync(CancellationToken.None); + } + + string expectedErrorMessage = "The continuation token supplied requires the Optimistic Direct Execution flag to be enabled in QueryRequestOptions for the query execution to resume. "; + Assert.IsTrue(responseMessage.CosmosException.ToString().Contains(expectedErrorMessage)); + } + private class CustomHandler : RequestHandler { string correlatedActivityId; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OptimisticDirectExecutionQueryBaselineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OptimisticDirectExecutionQueryBaselineTests.cs index 81c765adc..5c0e66f1a 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OptimisticDirectExecutionQueryBaselineTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OptimisticDirectExecutionQueryBaselineTests.cs @@ -19,6 +19,7 @@ using Microsoft.Azure.Cosmos.Query.Core.Pipeline; using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy; using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.Parallel; + using Microsoft.Azure.Cosmos.Query.Core.Pipeline.OptimisticDirectExecutionQuery; using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; using Microsoft.Azure.Cosmos.Query.Core.QueryClient; using Microsoft.Azure.Cosmos.Query.Core.QueryPlan; @@ -190,6 +191,49 @@ Assert.AreEqual(100, documentCountInSinglePartition); } + [TestMethod] + public async Task TestOdeTokenWithSpecializedPipeline() + { + int numItems = 100; + ParallelContinuationToken parallelContinuationToken = new ParallelContinuationToken( + token: Guid.NewGuid().ToString(), + range: new Documents.Routing.Range("A", "B", true, false)); + + OptimisticDirectExecutionContinuationToken optimisticDirectExecutionContinuationToken = new OptimisticDirectExecutionContinuationToken(parallelContinuationToken); + CosmosElement cosmosElementContinuationToken = OptimisticDirectExecutionContinuationToken.ToCosmosElement(optimisticDirectExecutionContinuationToken); + + OptimisticDirectExecutionTestInput input = CreateInput( + description: @"Single Partition Key and Value Field", + query: "SELECT VALUE COUNT(1) FROM c", + expectedOptimisticDirectExecution: false, + partitionKeyPath: @"/pk", + partitionKeyValue: "a", + continuationToken: cosmosElementContinuationToken); + + DocumentContainer documentContainer = await CreateDocumentContainerAsync(numItems, multiPartition: false); + QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(enableOptimisticDirectExecution: input.ExpectedOptimisticDirectExecution); + (CosmosQueryExecutionContextFactory.InputParameters inputParameters, CosmosQueryContextCore cosmosQueryContextCore) = CreateInputParamsAndQueryContext(input, queryRequestOptions); + IQueryPipelineStage queryPipelineStage = CosmosQueryExecutionContextFactory.Create( + documentContainer, + cosmosQueryContextCore, + inputParameters, + NoOpTrace.Singleton); + + string expectedErrorMessage = "The continuation token supplied requires the Optimistic Direct Execution flag to be enabled in QueryRequestOptions for the query execution to resume. "; + + while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton)) + { + if (queryPipelineStage.Current.Failed) + { + Assert.IsTrue(queryPipelineStage.Current.InnerMostException.ToString().Contains(expectedErrorMessage)); + return; + } + + Assert.IsFalse(true); + break; + } + } + [TestMethod] public async Task TestQueriesWhichNeverRequireDistribution() {