Switch non streaming order by to use flag from query plan instead of _streaming in the response (#4459)

This commit is contained in:
neildsh 2024-04-29 14:17:12 -07:00 коммит произвёл GitHub
Родитель bf2f5ee197
Коммит bda8290201
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
5 изменённых файлов: 63 добавлений и 37 удалений

Просмотреть файл

@ -55,6 +55,8 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
public int MaxConcurrency { get; }
public bool NonStreamingOrderBy { get; }
public InitializationParameters(
IDocumentContainer documentContainer,
SqlQuerySpec sqlQuerySpec,
@ -62,7 +64,8 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
PartitionKey? partitionKey,
IReadOnlyList<OrderByColumn> orderByColumns,
QueryPaginationOptions queryPaginationOptions,
int maxConcurrency)
int maxConcurrency,
bool nonStreamingOrderBy)
{
this.DocumentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer));
this.SqlQuerySpec = sqlQuerySpec ?? throw new ArgumentNullException(nameof(sqlQuerySpec));
@ -71,6 +74,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
this.OrderByColumns = orderByColumns ?? throw new ArgumentNullException(nameof(orderByColumns));
this.QueryPaginationOptions = queryPaginationOptions ?? throw new ArgumentNullException(nameof(queryPaginationOptions));
this.MaxConcurrency = maxConcurrency;
this.NonStreamingOrderBy = nonStreamingOrderBy;
}
}
@ -181,6 +185,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
IReadOnlyList<OrderByColumn> orderByColumns,
QueryPaginationOptions queryPaginationOptions,
int maxConcurrency,
bool nonStreamingOrderBy,
CosmosElement continuationToken)
{
if (documentContainer == null)
@ -233,24 +238,28 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
partitionKey,
orderByColumns,
queryPaginationOptions,
maxConcurrency);
maxConcurrency,
nonStreamingOrderBy);
return TryCatch<IQueryPipelineStage>.FromResult(new OrderByCrossPartitionQueryPipelineStage(init));
}
private static async ValueTask<(TryCatch<IQueryPipelineStage>, Queue<QueryPage>)> MoveNextAsync_InitializeAsync(InitializationParameters init, ITrace trace, CancellationToken cancellationToken)
private static async ValueTask<(TryCatch<IQueryPipelineStage>, Queue<QueryPage>)> MoveNextAsync_InitializeAsync(
InitializationParameters parameters,
ITrace trace,
CancellationToken cancellationToken)
{
SqlQuerySpec rewrittenQueryForOrderBy = new SqlQuerySpec(
init.SqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter),
init.SqlQuerySpec.Parameters);
parameters.SqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter),
parameters.SqlQuerySpec.Parameters);
List<OrderByQueryPartitionRangePageAsyncEnumerator> uninitializedEnumerators = init.TargetRanges
List<OrderByQueryPartitionRangePageAsyncEnumerator> uninitializedEnumerators = parameters.TargetRanges
.Select(range => OrderByQueryPartitionRangePageAsyncEnumerator.Create(
init.DocumentContainer,
parameters.DocumentContainer,
rewrittenQueryForOrderBy,
new FeedRangeState<QueryState>(range, state: default),
init.PartitionKey,
init.QueryPaginationOptions,
parameters.PartitionKey,
parameters.QueryPaginationOptions,
TrueFilter,
PrefetchPolicy.PrefetchSinglePage))
.ToList();
@ -259,13 +268,12 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
uninitializedEnumerators
.Select(x => (x, (OrderByContinuationToken)null)));
await ParallelPrefetch.PrefetchInParallelAsync(uninitializedEnumerators, init.MaxConcurrency, trace, cancellationToken);
await ParallelPrefetch.PrefetchInParallelAsync(uninitializedEnumerators, parameters.MaxConcurrency, trace, cancellationToken);
IReadOnlyList<SortOrder> sortOrders = init.OrderByColumns.Select(column => column.SortOrder).ToList();
IReadOnlyList<SortOrder> sortOrders = parameters.OrderByColumns.Select(column => column.SortOrder).ToList();
PriorityQueue<OrderByQueryPartitionRangePageAsyncEnumerator> initializedEnumerators = new PriorityQueue<OrderByQueryPartitionRangePageAsyncEnumerator>(new OrderByEnumeratorComparer(sortOrders));
Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> enumeratorsAndTokens = new Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)>();
bool nonStreaming = false;
Queue<QueryPage> bufferedPages = new Queue<QueryPage>();
QueryPageParameters queryPageParameters = null;
while (uninitializedEnumeratorsAndTokens.Count != 0)
@ -278,7 +286,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
if (IsSplitException(enumerator.Current.Exception))
{
await MoveNextAsync_InitializeAsync_HandleSplitAsync(
init.DocumentContainer,
parameters.DocumentContainer,
uninitializedEnumeratorsAndTokens,
enumerator,
token,
@ -307,9 +315,6 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
additionalHeaders: page.AdditionalHeaders);
}
// For backwards compatibility the default value of streaming for ORDER BY is _true_
nonStreaming = nonStreaming || (!page.Streaming.GetValueOrDefault(true) && (page.State != null));
if (enumerator.Current.Result.Enumerator.MoveNext())
{
// the page is non-empty then we need to enqueue the enumerator in the PriorityQueue
@ -335,7 +340,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
}
IQueryPipelineStage pipelineStage;
if (nonStreaming)
if (parameters.NonStreamingOrderBy)
{
Queue<OrderByQueryPartitionRangePageAsyncEnumerator> orderbyEnumerators = new Queue<OrderByQueryPartitionRangePageAsyncEnumerator>();
foreach ((OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken _) in enumeratorsAndTokens)
@ -350,10 +355,10 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
orderbyEnumerators.Enqueue(bufferedEnumerator);
}
await ParallelPrefetch.PrefetchInParallelAsync(orderbyEnumerators, init.MaxConcurrency, trace, cancellationToken);
await ParallelPrefetch.PrefetchInParallelAsync(orderbyEnumerators, parameters.MaxConcurrency, trace, cancellationToken);
pipelineStage = await NonStreamingOrderByPipelineStage.CreateAsync(
init.QueryPaginationOptions,
parameters.QueryPaginationOptions,
sortOrders,
orderbyEnumerators,
queryPageParameters,
@ -363,12 +368,12 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
else
{
pipelineStage = StreamingOrderByCrossPartitionQueryPipelineStage.Create(
init.DocumentContainer,
parameters.DocumentContainer,
sortOrders,
initializedEnumerators,
enumeratorsAndTokens,
init.QueryPaginationOptions,
init.MaxConcurrency);
parameters.QueryPaginationOptions,
parameters.MaxConcurrency);
}
return (TryCatch<IQueryPipelineStage>.FromResult(pipelineStage), bufferedPages);

Просмотреть файл

@ -78,7 +78,8 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline
.OrderByExpressions
.Zip(queryInfo.OrderBy, (expression, sortOrder) => new OrderByColumn(expression, sortOrder)).ToList(),
queryPaginationOptions: queryPaginationOptions,
maxConcurrency: maxConcurrency,
maxConcurrency: maxConcurrency,
nonStreamingOrderBy: queryInfo.HasNonStreamingOrderBy,
continuationToken: continuationToken);
}
else

Просмотреть файл

@ -44,16 +44,16 @@ namespace Microsoft.Azure.Cosmos.Performance.Tests.Query
[Benchmark(Baseline = true)]
public Task StreamingOrderByPipelineStage()
{
return CreateAndRunPipeline(StreamingContainer);
return CreateAndRunPipeline(StreamingContainer, nonStreamingOrderBy: false);
}
[Benchmark]
public Task NonStreamingOrderByPipelineStage()
{
return CreateAndRunPipeline(NonStreamingContainer);
return CreateAndRunPipeline(NonStreamingContainer, nonStreamingOrderBy: true);
}
private static async Task CreateAndRunPipeline(IDocumentContainer documentContainer)
private static async Task CreateAndRunPipeline(IDocumentContainer documentContainer, bool nonStreamingOrderBy)
{
IReadOnlyList<FeedRangeEpk> ranges = await documentContainer.GetFeedRangesAsync(
trace: NoOpTrace.Singleton,
@ -67,6 +67,7 @@ namespace Microsoft.Azure.Cosmos.Performance.Tests.Query
orderByColumns: OrderByColumns,
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: EndUserPageSize),
maxConcurrency: MaxConcurrency,
nonStreamingOrderBy: nonStreamingOrderBy,
continuationToken: null);
IQueryPipelineStage pipeline = pipelineStage.Result;

Просмотреть файл

@ -228,14 +228,16 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
ranges: ranges,
queryText: testCase.QueryText,
orderByColumns: testCase.OrderByColumns,
pageSize: pageSize);
pageSize: pageSize,
nonStreamingOrderBy: true);
IReadOnlyList<CosmosElement> streamingResult = await CreateAndRunPipelineStage(
documentContainer: documentContainer,
ranges: ranges,
queryText: testCase.QueryText,
orderByColumns: testCase.OrderByColumns,
pageSize: pageSize);
pageSize: pageSize,
nonStreamingOrderBy: false);
if (!streamingResult.SequenceEqual(nonStreamingResult))
{
@ -255,7 +257,8 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
IReadOnlyList<FeedRangeEpk> ranges,
string queryText,
IReadOnlyList<OrderByColumn> orderByColumns,
int pageSize)
int pageSize,
bool nonStreamingOrderBy)
{
TryCatch<IQueryPipelineStage> pipelineStage = OrderByCrossPartitionQueryPipelineStage.MonadicCreate(
documentContainer: documentContainer,
@ -265,6 +268,7 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
orderByColumns: orderByColumns,
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: pageSize),
maxConcurrency: MaxConcurrency,
nonStreamingOrderBy: nonStreamingOrderBy,
continuationToken: null);
Assert.IsTrue(pipelineStage.Succeeded);
@ -311,7 +315,8 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
ranges: ranges,
queryText: testCase.QueryText,
orderByColumns: testCase.OrderByColumns,
pageSize: pageSize);
pageSize: pageSize,
nonStreamingOrderBy: true);
DebugTraceHelpers.TraceStreamingPipelineStarting();
IReadOnlyList<CosmosElement> streamingResult = await CreateAndRunPipelineStage(
@ -319,7 +324,8 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
ranges: ranges,
queryText: testCase.QueryText,
orderByColumns: testCase.OrderByColumns,
pageSize: pageSize);
pageSize: pageSize,
nonStreamingOrderBy: false);
if (!streamingResult.SequenceEqual(nonStreamingResult))
{

Просмотреть файл

@ -77,7 +77,8 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
new OrderByColumn("_ts", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: null);
Assert.IsTrue(monadicCreate.Succeeded);
}
@ -98,6 +99,7 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: CosmosObject.Create(new Dictionary<string, CosmosElement>()));
Assert.IsTrue(monadicCreate.Failed);
Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException);
@ -119,6 +121,7 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: CosmosArray.Create(new List<CosmosElement>()));
Assert.IsTrue(monadicCreate.Failed);
Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException);
@ -140,6 +143,7 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: CosmosArray.Create(new List<CosmosElement>() { CosmosString.Create("asdf") }));
Assert.IsTrue(monadicCreate.Failed);
Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException);
@ -176,6 +180,7 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: CosmosArray.Create(
new List<CosmosElement>()
{
@ -220,6 +225,7 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: CosmosArray.Create(
new List<CosmosElement>()
{
@ -279,6 +285,7 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: CosmosArray.Create(
new List<CosmosElement>()
{
@ -321,6 +328,7 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: CosmosArray.Create(
new List<CosmosElement>()
{
@ -361,7 +369,8 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
new OrderByColumn("item2", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: CosmosArray.Create(
new List<CosmosElement>()
{
@ -416,7 +425,8 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
new OrderByColumn("c._ts", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 1),
maxConcurrency: 0,
maxConcurrency: 0,
nonStreamingOrderBy: false,
continuationToken: CosmosElement.Parse(continuationToken));
Assert.IsTrue(monadicCreate.Succeeded);
@ -451,7 +461,8 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
new OrderByColumn("c._ts", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: null);
Assert.IsTrue(monadicCreate.Succeeded);
IQueryPipelineStage queryPipelineStage = monadicCreate.Result;
@ -500,7 +511,8 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
new OrderByColumn("c._ts", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: null);
Assert.IsTrue(monadicCreate.Succeeded);
IQueryPipelineStage queryPipelineStage = monadicCreate.Result;
@ -557,7 +569,8 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
new OrderByColumn("c.pk", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: continuationToken);
monadicQueryPipelineStage.ThrowIfFailed();
IQueryPipelineStage queryPipelineStage = monadicQueryPipelineStage.Result;