Add bufferItems support & other cross-partition perf improvements (#397)

* [WIP] Prefetch pages in defaultQueryExecutionContext

* Always prefetch :)

* [WIP] Switch to fetchFunction

* [WIP]Use fetchFunction for preFetching +logging

* [WIP] Queries are running 2-10x faster

* Fix MinAggregator to handle nulls + update Sample code

* small logger improvements

* Change logger to method based level system

* Add bufferItems option + tests

* clean up

* add bufferItems to sample

* fix maxDegreeOfParallelism to handle 0 properly

* bufferItems should be true in throughput sample

* forceQueryPlan

* direct import uuid/v4
This commit is contained in:
Christopher Anderson 2019-08-19 20:19:43 -07:00 коммит произвёл GitHub
Родитель 26ba008b2c
Коммит 5d5acd7cb0
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
16 изменённых файлов: 523 добавлений и 44 удалений

181
package-lock.json сгенерированный
Просмотреть файл

@ -176,6 +176,11 @@
"integrity": "sha512-VQgHxyPMTj3hIlq9SY1mctqx+Jj8kpQfoLvDlVSDNOyuYs8JYfkuY3OW/4+dO657yPmNhHpePRx0/Tje5ImNVQ==",
"dev": true
},
"@types/debug": {
"version": "4.1.4",
"resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.4.tgz",
"integrity": "sha512-D9MyoQFI7iP5VdpEyPZyjjqIJ8Y8EDNQFIFVLOmeg1rI1xiHOChyUPMPRUVfqFCerxfE+yS3vMyj37F6IdtOoQ=="
},
"@types/estree": {
"version": "0.0.39",
"resolved": "https://registry.npmjs.org/@types/estree/-/estree-0.0.39.tgz",
@ -1043,12 +1048,18 @@
"dev": true
},
"debug": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.1.0.tgz",
"integrity": "sha512-OX8XqP7/1a9cqkxYw2yXss15f26NKWBpDXQd0/uK/KPqdQhxbPa994hnzjcE2VqQpDslf55723cKPUOGSmMY3g==",
"dev": true,
"version": "4.1.1",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.1.1.tgz",
"integrity": "sha512-pYAIzeRo8J6KPEaJ0VWOh5Pzkbw/RetuzehGM7QRRX5he4fPHx2rdKMB256ehJCkX+XRQm16eZLqLNS8RSZXZw==",
"requires": {
"ms": "2.0.0"
"ms": "^2.1.1"
},
"dependencies": {
"ms": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w=="
}
}
},
"decode-uri-component": {
@ -1206,6 +1217,17 @@
"debug": "~3.1.0",
"engine.io-parser": "~2.1.0",
"ws": "~3.3.1"
},
"dependencies": {
"debug": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.1.0.tgz",
"integrity": "sha512-OX8XqP7/1a9cqkxYw2yXss15f26NKWBpDXQd0/uK/KPqdQhxbPa994hnzjcE2VqQpDslf55723cKPUOGSmMY3g==",
"dev": true,
"requires": {
"ms": "2.0.0"
}
}
}
},
"engine.io-client": {
@ -1225,6 +1247,17 @@
"ws": "~3.3.1",
"xmlhttprequest-ssl": "~1.5.4",
"yeast": "0.1.2"
},
"dependencies": {
"debug": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.1.0.tgz",
"integrity": "sha512-OX8XqP7/1a9cqkxYw2yXss15f26NKWBpDXQd0/uK/KPqdQhxbPa994hnzjcE2VqQpDslf55723cKPUOGSmMY3g==",
"dev": true,
"requires": {
"ms": "2.0.0"
}
}
}
},
"engine.io-parser": {
@ -2601,6 +2634,17 @@
"requires": {
"agent-base": "4",
"debug": "3.1.0"
},
"dependencies": {
"debug": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.1.0.tgz",
"integrity": "sha512-OX8XqP7/1a9cqkxYw2yXss15f26NKWBpDXQd0/uK/KPqdQhxbPa994hnzjcE2VqQpDslf55723cKPUOGSmMY3g==",
"dev": true,
"requires": {
"ms": "2.0.0"
}
}
}
},
"https-proxy-agent": {
@ -2611,6 +2655,23 @@
"requires": {
"agent-base": "^4.1.0",
"debug": "^3.1.0"
},
"dependencies": {
"debug": {
"version": "3.2.6",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.2.6.tgz",
"integrity": "sha512-mel+jf7nrtEl5Pn1Qx46zARXKDpBbvzezse7p7LqINmdoIk8PYP5SySaxEmYv6TZ0JyEKA1hsCId6DIhgITtWQ==",
"dev": true,
"requires": {
"ms": "^2.1.1"
}
},
"ms": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==",
"dev": true
}
}
},
"iconv-lite": {
@ -3402,6 +3463,15 @@
"integrity": "sha512-VlfT9F3V0v+jr4yxPc5gg9s62/fIVWsd2Bk2iD435um1NlGMYdVCq+MjcXnhYq2icNOizHr1kK+5TI6H0Hy0ag==",
"dev": true
},
"debug": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.1.0.tgz",
"integrity": "sha512-OX8XqP7/1a9cqkxYw2yXss15f26NKWBpDXQd0/uK/KPqdQhxbPa994hnzjcE2VqQpDslf55723cKPUOGSmMY3g==",
"dev": true,
"requires": {
"ms": "2.0.0"
}
},
"supports-color": {
"version": "5.4.0",
"resolved": "https://registry.npmjs.org/supports-color/-/supports-color-5.4.0.tgz",
@ -3445,6 +3515,23 @@
"requires": {
"debug": "^3.1.0",
"lodash": "^4.16.4"
},
"dependencies": {
"debug": {
"version": "3.2.6",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.2.6.tgz",
"integrity": "sha512-mel+jf7nrtEl5Pn1Qx46zARXKDpBbvzezse7p7LqINmdoIk8PYP5SySaxEmYv6TZ0JyEKA1hsCId6DIhgITtWQ==",
"dev": true,
"requires": {
"ms": "^2.1.1"
}
},
"ms": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==",
"dev": true
}
}
},
"ms": {
@ -3686,7 +3773,7 @@
},
"os-homedir": {
"version": "1.0.2",
"resolved": "http://registry.npmjs.org/os-homedir/-/os-homedir-1.0.2.tgz",
"resolved": "https://registry.npmjs.org/os-homedir/-/os-homedir-1.0.2.tgz",
"integrity": "sha1-/7xJiDNuDoM94MFox+8VISGqf7M=",
"dev": true
},
@ -3724,6 +3811,23 @@
"pac-resolver": "^3.0.0",
"raw-body": "^2.2.0",
"socks-proxy-agent": "^4.0.1"
},
"dependencies": {
"debug": {
"version": "3.2.6",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.2.6.tgz",
"integrity": "sha512-mel+jf7nrtEl5Pn1Qx46zARXKDpBbvzezse7p7LqINmdoIk8PYP5SySaxEmYv6TZ0JyEKA1hsCId6DIhgITtWQ==",
"dev": true,
"requires": {
"ms": "^2.1.1"
}
},
"ms": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==",
"dev": true
}
}
},
"pac-resolver": {
@ -3864,6 +3968,23 @@
"pac-proxy-agent": "^3.0.0",
"proxy-from-env": "^1.0.0",
"socks-proxy-agent": "^4.0.1"
},
"dependencies": {
"debug": {
"version": "3.2.6",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.2.6.tgz",
"integrity": "sha512-mel+jf7nrtEl5Pn1Qx46zARXKDpBbvzezse7p7LqINmdoIk8PYP5SySaxEmYv6TZ0JyEKA1hsCId6DIhgITtWQ==",
"dev": true,
"requires": {
"ms": "^2.1.1"
}
},
"ms": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==",
"dev": true
}
}
},
"proxy-from-env": {
@ -4346,6 +4467,17 @@
"socket.io-adapter": "~1.1.0",
"socket.io-client": "2.1.1",
"socket.io-parser": "~3.2.0"
},
"dependencies": {
"debug": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.1.0.tgz",
"integrity": "sha512-OX8XqP7/1a9cqkxYw2yXss15f26NKWBpDXQd0/uK/KPqdQhxbPa994hnzjcE2VqQpDslf55723cKPUOGSmMY3g==",
"dev": true,
"requires": {
"ms": "2.0.0"
}
}
}
},
"socket.io-adapter": {
@ -4374,6 +4506,17 @@
"parseuri": "0.0.5",
"socket.io-parser": "~3.2.0",
"to-array": "0.1.4"
},
"dependencies": {
"debug": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.1.0.tgz",
"integrity": "sha512-OX8XqP7/1a9cqkxYw2yXss15f26NKWBpDXQd0/uK/KPqdQhxbPa994hnzjcE2VqQpDslf55723cKPUOGSmMY3g==",
"dev": true,
"requires": {
"ms": "2.0.0"
}
}
}
},
"socket.io-parser": {
@ -4387,6 +4530,15 @@
"isarray": "2.0.1"
},
"dependencies": {
"debug": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.1.0.tgz",
"integrity": "sha512-OX8XqP7/1a9cqkxYw2yXss15f26NKWBpDXQd0/uK/KPqdQhxbPa994hnzjcE2VqQpDslf55723cKPUOGSmMY3g==",
"dev": true,
"requires": {
"ms": "2.0.0"
}
},
"isarray": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/isarray/-/isarray-2.0.1.tgz",
@ -4503,6 +4655,23 @@
"debug": "^3.1.0",
"fs-extra": "^7.0.0",
"lodash": "^4.17.10"
},
"dependencies": {
"debug": {
"version": "3.2.6",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.2.6.tgz",
"integrity": "sha512-mel+jf7nrtEl5Pn1Qx46zARXKDpBbvzezse7p7LqINmdoIk8PYP5SySaxEmYv6TZ0JyEKA1hsCId6DIhgITtWQ==",
"dev": true,
"requires": {
"ms": "^2.1.1"
}
},
"ms": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==",
"dev": true
}
}
},
"string_decoder": {

Просмотреть файл

@ -88,9 +88,11 @@
},
"dependencies": {
"@azure/cosmos-sign": "1.0.2",
"@types/debug": "^4.1.4",
"atob": "2.1.2",
"binary-search-bounds": "2.0.3",
"crypto-hash": "1.1.0",
"debug": "^4.1.1",
"fast-json-stable-stringify": "2.0.0",
"node-abort-controller": "1.0.3",
"node-fetch": "2.6.0",

Просмотреть файл

@ -0,0 +1,55 @@
//@ts-check
const { CosmosClient } = require("../..");
const endpoint = process.env.QUERY_SCENARIO_COSMOS_ENDPOINT || process.env.COSMOS_ENDPOINT;
const key = process.env.QUERY_SCENARIO_COSMOS_KEY || process.env.COSMOS_KEY;
const dbId = process.env.QUERY_SCENARIO_COSMOS_DB || process.env.COSMOS_DB;
const containerId = process.env.QUERY_SCENARIO_COSMOS_CONTAINER || process.env.COSMOS_CONTAINER;
async function run() {
const client = new CosmosClient({
endpoint,
key
});
const query1 = "Select * from c order by c._ts";
const query2 = "Select * from c";
const query3 = "Select value count(c.id) from c";
const container = client.database(dbId).container(containerId);
const options = {
maxItemCount: 10000,
maxDegreeOfParallelism: 1000,
bufferItems: true
};
const scenarios = [];
scenarios.push({ container, query: query1, options });
scenarios.push({ container, query: query2, options });
scenarios.push({ container, query: query3, options });
for (const scenario of scenarios) {
try {
console.log("Scenario starting: " + scenario.query);
const start = Date.now();
await runScenario(scenario.container, scenario.query, scenario.options);
console.log('Scenario complete: "' + scenario.query + '" - took ' + (Date.now() - start) / 1000 + "s");
} catch (e) {
console.log("Scenario failed: " + scenario.query + " - " + JSON.stringify(scenario.options));
}
}
}
async function runScenario(container, query, options) {
const queryIterator = container.items.query(query, options);
let count = 0;
while (queryIterator.hasMoreResults() && count <= 100000) {
const { resources: results } = await queryIterator.fetchNext();
if (results != undefined) {
count = count + results.length;
}
}
}
run().catch(console.error);

Просмотреть файл

@ -1,7 +1,9 @@
import uuid from "uuid/v4";
import { PartitionKeyRange } from "./client/Container/PartitionKeyRange";
import { Resource } from "./client/Resource";
import { Constants, HTTPMethod, OperationType, ResourceType } from "./common/constants";
import { getIdFromLink, getPathFromLink, parseLink } from "./common/helper";
import { logger } from "./common/logger";
import { StatusCodes, SubStatusCodes } from "./common/statusCodes";
import { CosmosClientOptions } from "./CosmosClientOptions";
import { ConnectionPolicy, ConsistencyLevel, DatabaseAccount, PartitionKey } from "./documents";
@ -10,8 +12,8 @@ import { executePlugins, PluginOn } from "./plugins/Plugin";
import { FetchFunctionCallback, SqlQuerySpec } from "./queryExecutionContext";
import { CosmosHeaders } from "./queryExecutionContext/CosmosHeaders";
import { QueryIterator } from "./queryIterator";
import { FeedOptions, RequestOptions, Response } from "./request";
import { ErrorResponse } from "./request";
import { FeedOptions, RequestOptions, Response } from "./request";
import { PartitionedQueryExecutionInfo } from "./request/ErrorResponse";
import { getHeaders } from "./request/request";
import { RequestContext } from "./request/RequestContext";
@ -19,6 +21,9 @@ import { request as executeRequest } from "./request/RequestHandler";
import { SessionContainer } from "./session/sessionContainer";
import { SessionContext } from "./session/SessionContext";
/** @hidden */
const log = logger("ClientContext");
/**
* @hidden
* @ignore
@ -122,7 +127,7 @@ export class ClientContext {
plugins: this.cosmosClientOptions.plugins,
partitionKey
};
const requestId = uuid();
if (query !== undefined) {
request.method = HTTPMethod.post;
}
@ -136,7 +141,13 @@ export class ClientContext {
}
}
this.applySessionToken(request);
log.info(
"query " + requestId + " started" + (request.partitionKeyRangeId ? " pkrid: " + request.partitionKeyRangeId : "")
);
log.silly(request);
const start = Date.now();
const response = await executeRequest(request);
log.info("query " + requestId + " finished - " + (Date.now() - start) + "ms");
this.captureSessionToken(undefined, path, OperationType.Query, response.headers);
return this.processQueryFeedResponse(response, !!query, resultFn);
}

31
src/common/logger.ts Normal file
Просмотреть файл

@ -0,0 +1,31 @@
import debugLib from "debug";
/** @hidden */
const cosmosLevelFilter = process.env.COSMOS_LOG_LEVEL || "warn|error";
/** @hidden */
const cosmosDebug = debugLib("cosmos");
/** @hidden */
type logLevel = "silly" | "debug" | "info" | "warn" | "error";
/** @hidden */
const levelLogger = (namespaceLogger: debugLib.Debugger, level: logLevel) => {
return (message: string | { [key: string]: any }) => {
if (cosmosLevelFilter.includes(level)) {
namespaceLogger("[" + new Date().toISOString() + "][" + level + "]: %o", message);
}
};
};
/** @hidden */
export const logger = (namespace: string) => {
const namespaceLogger = cosmosDebug.extend(namespace);
return {
silly: levelLogger(namespaceLogger, "silly"),
debug: levelLogger(namespaceLogger, "debug"),
info: levelLogger(namespaceLogger, "info"),
warn: levelLogger(namespaceLogger, "warn"),
error: levelLogger(namespaceLogger, "error")
};
};

Просмотреть файл

@ -22,10 +22,12 @@ export class MinAggregator implements IAggregator<number> {
*/
public aggregate(other: number) {
if (this.value === undefined) {
// || typeof this.value === "object"
this.value = other;
} else {
const otherType = other == null ? "NoValue" : typeof other;
if (this.comparer.compareValue(other, otherType, this.value, typeof this.value) < 0) {
const otherType = other === null ? "NoValue" : typeof other; // || typeof other === "object"
const thisType = this.value === null ? "NoValue" : typeof this.value;
if (this.comparer.compareValue(other, otherType, this.value, thisType) < 0) {
this.value = other;
}
}

Просмотреть файл

@ -1,11 +1,15 @@
import { Constants } from "../common";
import { logger } from "../common/logger";
import { ClientSideMetrics, QueryMetrics } from "../queryMetrics";
import { Response } from "../request";
import { FeedOptions, Response } from "../request";
import { getInitialHeader } from "./headerUtils";
import { ExecutionContext } from "./index";
/** @hidden */
export type FetchFunctionCallback = (options: any) => Promise<Response<any>>;
const log = logger("defaultQueryExecutionContext");
/** @hidden */
export type FetchFunctionCallback = (options: FeedOptions) => Promise<Response<any>>;
/** @hidden */
enum STATES {
@ -17,13 +21,14 @@ enum STATES {
/** @hidden */
export class DefaultQueryExecutionContext implements ExecutionContext {
private static readonly STATES = STATES;
private resources: any; // TODO: any resources
private resources: any[]; // TODO: any resources
private currentIndex: number;
private currentPartitionIndex: number;
private fetchFunctions: FetchFunctionCallback[];
private options: any; // TODO: any options
public continuation: any; // TODO: any continuation
private options: FeedOptions; // TODO: any options
public continuation: string; // TODO: any continuation
private state: STATES;
private nextFetchFunction: Promise<Response<any>>;
/**
* Provides the basic Query Execution Context.
* This wraps the internal logic query execution using provided fetch functions
@ -109,6 +114,11 @@ export class DefaultQueryExecutionContext implements ExecutionContext {
);
}
/**
* Fetches the next batch of the feed and pass them as an array to a callback
* @memberof DefaultQueryExecutionContext
* @instance
*/
/**
* Fetches the next batch of the feed and pass them as an array to a callback
* @memberof DefaultQueryExecutionContext
@ -128,13 +138,33 @@ export class DefaultQueryExecutionContext implements ExecutionContext {
return { headers: getInitialHeader(), result: undefined };
}
const fetchFunction = this.fetchFunctions[this.currentPartitionIndex];
let resources;
let responseHeaders;
try {
const response = await fetchFunction(this.options);
let p: Promise<Response<any>>;
if (this.nextFetchFunction !== undefined) {
log.debug("using prefetch");
p = this.nextFetchFunction;
this.nextFetchFunction = undefined;
} else {
log.debug("using fresh fetch");
p = this.fetchFunctions[this.currentPartitionIndex](this.options);
}
const response = await p;
resources = response.result;
responseHeaders = response.headers;
this.continuation = responseHeaders[Constants.HttpHeaders.Continuation];
if (!this.continuation) {
++this.currentPartitionIndex;
}
if (this.options && this.options.bufferItems === true) {
const fetchFunction = this.fetchFunctions[this.currentPartitionIndex];
this.nextFetchFunction = fetchFunction
? fetchFunction({ ...this.options, continuation: this.continuation })
: undefined;
}
} catch (err) {
this.state = DefaultQueryExecutionContext.STATES.ended;
// return callback(err, undefined, responseHeaders);
@ -142,11 +172,6 @@ export class DefaultQueryExecutionContext implements ExecutionContext {
throw err;
}
this.continuation = responseHeaders[Constants.HttpHeaders.Continuation];
if (!this.continuation) {
++this.currentPartitionIndex;
}
this.state = DefaultQueryExecutionContext.STATES.inProgress;
this.currentIndex = 0;
this.options.continuation = originalContinuation;

Просмотреть файл

@ -17,6 +17,7 @@ export class DocumentProducer {
private err: Error;
public previousContinuationToken: string;
public continuationToken: string;
public generation: number = 0;
private respHeaders: CosmosHeaders;
private internalExecutionContext: DefaultQueryExecutionContext;
@ -150,6 +151,7 @@ export class DocumentProducer {
try {
const { result: resources, headers: headerResponse } = await this.internalExecutionContext.fetchMore();
++this.generation;
this._updateStates(undefined, resources === undefined);
if (resources !== undefined) {
// some more results

Просмотреть файл

@ -75,6 +75,9 @@ export class OrderByDocumentProducerComparator {
// TODO: This smells funny
public compareValue(item1: any, type1: string, item2: any, type2: string) {
if (type1 === "object" || type2 === "object") {
throw new Error("Tried to compare an object type");
}
const type1Ord = TYPEORDCOMPARATOR[type1].ord;
const type2Ord = TYPEORDCOMPARATOR[type2].ord;
const typeCmp = type1Ord - type2Ord;

Просмотреть файл

@ -36,8 +36,6 @@ export class ParallelQueryExecutionContext extends ParallelQueryExecutionContext
* @ignore
*/
public documentProducerComparator(docProd1: DocumentProducer, docProd2: DocumentProducer) {
const a = docProd1.getTargetParitionKeyRange()["minInclusive"];
const b = docProd2.getTargetParitionKeyRange()["minInclusive"];
return a === b ? 0 : a > b ? 1 : -1;
return docProd1.generation - docProd2.generation;
}
}

Просмотреть файл

@ -2,8 +2,9 @@ import * as bs from "binary-search-bounds";
import PriorityQueue from "priorityqueuejs";
import semaphore from "semaphore";
import { ClientContext } from "../ClientContext";
import { logger } from "../common/logger";
import { StatusCodes, SubStatusCodes } from "../common/statusCodes";
import { Response } from "../request";
import { FeedOptions, Response } from "../request";
import { PartitionedQueryExecutionInfo } from "../request/ErrorResponse";
import { QueryRange } from "../routing/QueryRange";
import { PARITIONKEYRANGE, SmartRoutingMapProvider } from "../routing/smartRoutingMapProvider";
@ -12,6 +13,9 @@ import { DocumentProducer } from "./documentProducer";
import { ExecutionContext } from "./ExecutionContext";
import { getInitialHeader, mergeHeaders } from "./headerUtils";
/** @hidden */
const log = logger("parallelQueryExecutionContextBase");
/** @hidden */
export enum ParallelQueryExecutionContextBaseStates {
started = "started",
@ -52,7 +56,7 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont
private clientContext: ClientContext,
private collectionLink: string,
private query: any, // TODO: any - It's not SQLQuerySpec
private options: any,
private options: FeedOptions,
private partitionedQueryExecutionInfo: PartitionedQueryExecutionInfo
) {
this.clientContext = clientContext;
@ -91,11 +95,22 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont
try {
const targetPartitionRanges = await this._onTargetPartitionRanges();
this.waitingForInternalExecutionContexts = targetPartitionRanges.length;
// default to 1 if none is provided.
// default to 1 if 0 or undefined is provided.
const maxDegreeOfParallelism =
options.maxDegreeOfParallelism > 0
? Math.min(options.maxDegreeOfParallelism, targetPartitionRanges.length)
: targetPartitionRanges.length;
options.maxDegreeOfParallelism === 0 || options.maxDegreeOfParallelism === undefined
? 1
: // use maximum parallelism if -1 (or less) is provided
options.maxDegreeOfParallelism > 0
? Math.min(options.maxDegreeOfParallelism + 1, targetPartitionRanges.length)
: targetPartitionRanges.length;
log.info(
"Query starting against " +
targetPartitionRanges.length +
" ranges with parallelism of " +
maxDegreeOfParallelism
);
const parallelismSem = semaphore(maxDegreeOfParallelism);
let filteredPartitionKeyRanges = [];

Просмотреть файл

@ -26,6 +26,7 @@ export class QueryIterator<T> {
private fetchAllLastResHeaders: CosmosHeaders;
private queryExecutionContext: ExecutionContext;
private queryPlanPromise: Promise<Response<PartitionedQueryExecutionInfo>>;
private isInitialied: boolean;
/**
* @hidden
*/
@ -43,6 +44,7 @@ export class QueryIterator<T> {
this.resourceLink = resourceLink;
this.fetchAllLastResHeaders = getInitialHeader();
this.reset();
this.isInitialied = false;
}
/**
@ -121,6 +123,10 @@ export class QueryIterator<T> {
*/
public async fetchNext(): Promise<FeedResponse<T>> {
this.queryPlanPromise = this.fetchQueryPlan();
if (!this.isInitialied) {
await this.init();
}
let response: Response<any>;
try {
response = await this.queryExecutionContext.fetchMore();
@ -145,7 +151,9 @@ export class QueryIterator<T> {
private async toArrayImplementation(): Promise<FeedResponse<T>> {
this.queryPlanPromise = this.fetchQueryPlan();
if (!this.isInitialied) {
await this.init();
}
while (this.queryExecutionContext.hasMoreResults()) {
let response: Response<any>;
try {
@ -217,4 +225,21 @@ export class QueryIterator<T> {
error.substatus === SubStatusCodes.CrossPartitionQueryNotServable
);
}
private initPromise: Promise<void>;
private async init() {
if (this.isInitialied === true) {
return;
}
if (this.initPromise) {
this.initPromise = this._init();
}
return this.initPromise;
}
private async _init() {
if (this.options.forceQueryPlan === true) {
await this.createPipelinedExecutionContext();
}
this.isInitialied = true;
}
}

Просмотреть файл

@ -4,25 +4,43 @@ import { SharedOptions } from "./SharedOptions";
* The feed options and query methods.
*/
export interface FeedOptions extends SharedOptions {
/** Opaque token for continuing the enumeration. */
/** Opaque token for continuing the enumeration. Default: undefined */
continuation?: string;
/**
* Limits the size of the continuation token in the response. Default: undefined
*
* Continuation Tokens contain optional data that can be removed from the serialization before writing it out to a header.
* By default we are capping this to 1kb to avoid long headers (Node.js has a global header size limit).
* A user may set this field to allow for longer headers, which can help the backend optimize query execution."
*/
continuationTokenLimitInKB?: number;
/** Allow scan on the queries which couldn't be served as indexing was opted out on the requested paths. */
/**
* Allow scan on the queries which couldn't be served as indexing was opted out on the requested paths. Default: false
*
* In general, it is best to avoid using this setting. Scans are relatively expensive and take a long time to serve.
*/
enableScanInQuery?: boolean;
/**
* The maximum number of concurrent operations that run client side during parallel query execution in the
* Azure Cosmos DB database service. Negative values make the system automatically decides the number of
* concurrent operations to run.
* concurrent operations to run. Default: 0 (no parallelism)
*/
maxDegreeOfParallelism?: number;
/** Max number of items to be returned in the enumeration operation. */
/**
* Max number of items to be returned in the enumeration operation. Default: undefined (server will defined payload)
*
* Expirimenting with this value can usually result in the biggest performance changes to the query.
*
* The smaller the item count, the faster the first result will be delivered (for non-aggregates). For larger amounts,
* it will take longer to serve the request, but you'll usually get better throughput for large queries (i.e. if you need 1000 items
* before you can do any other actions, set `maxItemCount` to 1000. If you can start doing work after the first 100, set `maxItemCount` to 100.)
*/
maxItemCount?: number;
/** Indicates a change feed request. Must be set to "Incremental feed", or omitted otherwise. */
/**
* Note: consider using readChangeFeed instead.
*
* Indicates a change feed request. Must be set to "Incremental feed", or omitted otherwise. Default: false
*/
useIncrementalFeed?: boolean;
/** Conditions Associated with the request. */
accessCondition?: {
@ -31,6 +49,34 @@ export interface FeedOptions extends SharedOptions {
/** Conditional HTTP method header value (the _etag field from the last version you read). */
condition: string;
};
/** Enable returning query metrics in response headers */
/**
* Enable returning query metrics in response headers. Default: false
*
* Used for debugging slow or expensive queries. Also increases response size and if you're using a low max header size in Node.js,
* you can run into issues faster.
*/
populateQueryMetrics?: boolean;
/**
* Enable buffering additional items during queries. Default: false
*
* This will buffer an additional page at a time (multiplied by maxDegreeOfParallelism) from the server in the background.
* This improves latency by fetching pages before they are needed by the client. If you're draining all of the results from the
* server, like `.fetchAll`, you should usually enable this. If you're only fetching one page at a time via continuation token,
* you should avoid this. If you're draining more than one page, but not the entire result set, it may help improve latency, but
* it will increase the total amount of RU/s use to serve the entire query (as some pages will be fetched more than once).
*/
bufferItems?: boolean;
/**
* This setting forces the query to use a query plan. Default: false
*
* Note: this will disable continuation token support, even for single partition queries.
*
* For queries like aggregates and most cross partition queries, this happens anyway.
* However, since the library doesn't know what type of query it is until we get back the first response,
* some optimization can't happen until later.
*
* If this setting is enabled, it will force query plan for the query, which will save some network requests
* and ensure parallelism can happen. Useful for when you know you're doing cross-partition or aggregate queries.
*/
forceQueryPlan?: boolean;
}

Просмотреть файл

@ -2,6 +2,7 @@ import AbortController from "node-abort-controller";
import fetch, { RequestInit, Response } from "node-fetch";
import { trimSlashes } from "../common";
import { Constants } from "../common/constants";
import { logger } from "../common/logger";
import { executePlugins, PluginOn } from "../plugins/Plugin";
import * as RetryUtility from "../retry/retryUtility";
import { defaultHttpAgent, defaultHttpsAgent } from "./defaultAgent";
@ -11,6 +12,9 @@ import { RequestContext } from "./RequestContext";
import { Response as CosmosResponse } from "./Response";
import { TimeoutError } from "./TimeoutError";
/** @hidden */
const log = logger("RequestHandler");
/** @hidden */
export async function executeRequest(requestContext: RequestContext) {
return executePlugins(requestContext, httpRequest, PluginOn.request);
@ -85,6 +89,8 @@ async function httpRequest(requestContext: RequestContext) {
if (response.status >= 400) {
const errorResponse: ErrorResponse = new Error(result.message);
log.warn(response.status + " " + requestContext.endpoint + " " + requestContext.path + " " + result.message);
errorResponse.code = response.status;
errorResponse.body = result;
errorResponse.headers = headers;

Просмотреть файл

@ -5,7 +5,7 @@ import { DataType, IndexKind } from "../../dist-esm/documents";
import { SqlQuerySpec } from "../../dist-esm/queryExecutionContext";
import { QueryIterator } from "../../dist-esm/queryIterator";
import { bulkInsertItems, getTestContainer, removeAllDatabases } from "../common/TestHelpers";
import { FeedResponse } from "../../dist-esm";
import { FeedResponse, FeedOptions } from "../../dist-esm";
function compare(key: string) {
return function(a: any, b: any): number {
@ -241,9 +241,10 @@ describe("Cross Partition", function() {
it("Validate Parallel Query As String With maxDegreeOfParallelism: -1", async function() {
// simple order by query in string format
const query = "SELECT * FROM root r";
const options = {
const options: FeedOptions = {
maxItemCount: 2,
maxDegreeOfParallelism: -1,
forceQueryPlan: true,
populateQueryMetrics: true
};
@ -272,9 +273,10 @@ describe("Cross Partition", function() {
it("Validate Parallel Query As String With maxDegreeOfParallelism: 3", async function() {
// simple order by query in string format
const query = "SELECT * FROM root r";
const options = {
const options: FeedOptions = {
maxItemCount: 2,
maxDegreeOfParallelism: 3
maxDegreeOfParallelism: 3,
bufferItems: true
};
// validates the results size and order
@ -377,7 +379,8 @@ describe("Cross Partition", function() {
const query = "SELECT DISTINCT VALUE r.spam3 FROM root r order by r.spam3";
const options = {
maxItemCount: 2,
maxDegreeOfParallelism: 3
maxDegreeOfParallelism: 3,
bufferItems: true
};
const expectedOrderedIds = ["eggs0", "eggs1", "eggs2"];
@ -549,7 +552,9 @@ describe("Cross Partition", function() {
const query = util.format("SELECT top %d * FROM root r", topCount);
const options = {
maxItemCount: 2,
maxDegreeOfParallelism: 3
maxDegreeOfParallelism: 3,
forceQueryPlan: true,
bufferItems: true
};
// prepare expected behaviour verifier

Просмотреть файл

@ -0,0 +1,84 @@
import { FetchFunctionCallback, DefaultQueryExecutionContext } from "../../dist-esm/queryExecutionContext";
import { FeedOptions } from "../../dist-esm";
import assert from "assert";
import { sleep, Constants } from "../../dist-esm/common";
describe("defaultQueryExecutionContext", function() {
it("should not buffer items if bufferItems is false", async function() {
let calledCount = 0;
const fetchFunction: FetchFunctionCallback = async () => {
calledCount++;
return {
code: 200,
headers: {
"x-ms-continuation": "any random text"
},
result: [
{
item: "foo"
}
],
substatus: 0
};
};
const options: FeedOptions = {
bufferItems: false
};
const context = new DefaultQueryExecutionContext(options, fetchFunction);
assert.strictEqual(calledCount, 0, "Nothing should be fetched at this point");
await context.fetchMore();
await sleep(10); //small sleep to make sure we give up event loop so any other fetch functions can get called
assert.strictEqual(calledCount, 1, "Should have only fetched 1 page");
await context.fetchMore();
await sleep(10); //small sleep to make sure we give up event loop so any other fetch functions can get called
assert.strictEqual(calledCount, 2, "Should have only fetched 2 pages");
});
it("should buffer items if bufferItems is true", async function() {
let calledCount = 0;
const fetchFunction: FetchFunctionCallback = async () => {
calledCount++;
return {
code: 200,
headers: {
"x-ms-continuation": "any random text"
},
result: [
{
item: "foo"
}
],
substatus: 0
};
};
const options: FeedOptions = {
bufferItems: true
};
const context = new DefaultQueryExecutionContext(options, fetchFunction);
assert.strictEqual(calledCount, 0, "Nothing should be fetched at this point");
await context.fetchMore();
await sleep(10); //small sleep to make sure we give up event loop so any other fetch functions can get called
assert.strictEqual(calledCount, 2, "Should have fetched 2 pages (one buffered)");
await context.fetchMore();
await sleep(10); //small sleep to make sure we give up event loop so any other fetch functions can get called
assert.strictEqual(calledCount, 3, "Should have only fetched 3 pages (one buffered)");
});
});