Fix Aggregate Request Charge (#366)
This commit is contained in:
Родитель
fb1c08160a
Коммит
bff1cb90a6
|
@ -1,7 +1,7 @@
|
|||
import { Response } from "../../request";
|
||||
import { AverageAggregator, CountAggregator, MaxAggregator, MinAggregator, SumAggregator } from "../Aggregators";
|
||||
import { ExecutionContext } from "../ExecutionContext";
|
||||
import { getInitialHeader } from "../headerUtils";
|
||||
import { getInitialHeader, mergeHeaders } from "../headerUtils";
|
||||
import { CosmosHeaders } from "../index";
|
||||
|
||||
/** @hidden */
|
||||
|
@ -11,6 +11,7 @@ export class AggregateEndpointComponent implements ExecutionContext {
|
|||
private aggregateValuesIndex: number;
|
||||
private localAggregators: any[];
|
||||
private started: boolean;
|
||||
private respHeaders: CosmosHeaders;
|
||||
|
||||
/**
|
||||
* Represents an endpoint in handling aggregate queries.
|
||||
|
@ -22,6 +23,7 @@ export class AggregateEndpointComponent implements ExecutionContext {
|
|||
// TODO: any
|
||||
this.executionContext = executionContext;
|
||||
this.localAggregators = [];
|
||||
this.respHeaders = getInitialHeader();
|
||||
aggregateOperators.forEach((aggregateOperator: string) => {
|
||||
switch (aggregateOperator) {
|
||||
case "Average":
|
||||
|
@ -83,13 +85,24 @@ export class AggregateEndpointComponent implements ExecutionContext {
|
|||
const { result: item, headers } = await this.executionContext.nextItem();
|
||||
if (item === undefined) {
|
||||
// no more results
|
||||
return { result: this.toArrayTempResources, headers };
|
||||
return { result: this.toArrayTempResources, headers: this.getAndResetActiveResponseHeaders() };
|
||||
}
|
||||
|
||||
this.toArrayTempResources = this.toArrayTempResources.concat(item);
|
||||
this.mergeWithActiveResponseHeaders(headers);
|
||||
return this._getQueryResults();
|
||||
}
|
||||
|
||||
private mergeWithActiveResponseHeaders(headers: CosmosHeaders) {
|
||||
mergeHeaders(this.respHeaders, headers);
|
||||
}
|
||||
|
||||
private getAndResetActiveResponseHeaders() {
|
||||
const ret = this.respHeaders;
|
||||
this.respHeaders = getInitialHeader();
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a provided function on the next element in the AggregateEndpointComponent.
|
||||
* @memberof AggregateEndpointComponent
|
||||
|
@ -106,7 +119,6 @@ export class AggregateEndpointComponent implements ExecutionContext {
|
|||
this.aggregateValuesIndex < this.aggregateValues.length
|
||||
? this.aggregateValues[++this.aggregateValuesIndex]
|
||||
: undefined;
|
||||
|
||||
return { result: resource, headers: resHeaders };
|
||||
}
|
||||
|
||||
|
|
|
@ -57,22 +57,26 @@ describe("Aggregate Query", function() {
|
|||
};
|
||||
|
||||
const validateToArray = async function(queryIterator: QueryIterator<any>, expectedResults: any) {
|
||||
const { resources: results } = await queryIterator.fetchAll();
|
||||
const { resources: results, requestCharge } = await queryIterator.fetchAll();
|
||||
assert(requestCharge > 0, "request charge was not greater than zero");
|
||||
assert.equal(results.length, expectedResults.length, "invalid number of results");
|
||||
assert.equal(queryIterator.hasMoreResults(), false, "hasMoreResults: no more results is left");
|
||||
return requestCharge;
|
||||
};
|
||||
|
||||
const validateExecuteNextAndHasMoreResults = async function(
|
||||
queryIterator: QueryIterator<any>,
|
||||
options: any,
|
||||
expectedResults: any[]
|
||||
expectedResults: any[],
|
||||
fetchAllRequestCharge: number
|
||||
) {
|
||||
const pageSize = options["maxItemCount"];
|
||||
const listOfResultPages: any[] = [];
|
||||
let totalFetchedResults: any[] = [];
|
||||
let totalExecuteNextRequestCharge = 0;
|
||||
|
||||
while (totalFetchedResults.length <= expectedResults.length) {
|
||||
const { resources: results } = await queryIterator.fetchNext();
|
||||
const { resources: results, requestCharge } = await queryIterator.fetchNext();
|
||||
listOfResultPages.push(results);
|
||||
|
||||
if (results === undefined || totalFetchedResults.length === expectedResults.length) {
|
||||
|
@ -80,6 +84,7 @@ describe("Aggregate Query", function() {
|
|||
}
|
||||
|
||||
totalFetchedResults = totalFetchedResults.concat(results);
|
||||
totalExecuteNextRequestCharge += requestCharge;
|
||||
|
||||
if (totalFetchedResults.length < expectedResults.length) {
|
||||
// there are more results
|
||||
|
@ -96,6 +101,14 @@ describe("Aggregate Query", function() {
|
|||
// no more results
|
||||
validateResult(totalFetchedResults, expectedResults);
|
||||
assert.equal(queryIterator.hasMoreResults(), false, "hasMoreResults: no more results is left");
|
||||
|
||||
assert(totalExecuteNextRequestCharge > 0);
|
||||
const percentDifference =
|
||||
Math.abs(fetchAllRequestCharge - totalExecuteNextRequestCharge) / totalExecuteNextRequestCharge;
|
||||
assert(
|
||||
percentDifference <= 0.01,
|
||||
"difference between fetchAll request charge and executeNext request charge should be less than 1%"
|
||||
);
|
||||
};
|
||||
|
||||
const ValidateAsyncIterator = async function(queryIterator: QueryIterator<any>, expectedResults: any[]) {
|
||||
|
@ -118,9 +131,9 @@ describe("Aggregate Query", function() {
|
|||
const options: FeedOptions = { maxDegreeOfParallelism: 2, maxItemCount: 1 };
|
||||
|
||||
const queryIterator = container.items.query(query, options);
|
||||
await validateToArray(queryIterator, expectedResults);
|
||||
const fetchAllRequestCharge = await validateToArray(queryIterator, expectedResults);
|
||||
queryIterator.reset();
|
||||
await validateExecuteNextAndHasMoreResults(queryIterator, options, expectedResults);
|
||||
await validateExecuteNextAndHasMoreResults(queryIterator, options, expectedResults, fetchAllRequestCharge);
|
||||
queryIterator.reset();
|
||||
await ValidateAsyncIterator(queryIterator, expectedResults);
|
||||
};
|
||||
|
|
Загрузка…
Ссылка в новой задаче