Add change feed support (#196)
* Change feed support * typos, PR feedback, fix StartTime test coverage * add retries to flaky test * Adds jsdoc, changes response type, and adjusts feed options * Add change feed sample * PR feedback * pr feedback * remove partitionkeyrange support and dedupe PK * Fix session test
This commit is contained in:
Родитель
0f5fc76f49
Коммит
2ac677b7db
|
@ -0,0 +1,157 @@
|
|||
// @ts-check
|
||||
"use strict";
|
||||
|
||||
const cosmos = require("../../lib/");
|
||||
const CosmosClient = cosmos.CosmosClient;
|
||||
const config = require("../Shared/config");
|
||||
const databaseId = config.names.database;
|
||||
const containerId = config.names.container;
|
||||
|
||||
const endpoint = config.connection.endpoint;
|
||||
const masterKey = config.connection.authKey;
|
||||
|
||||
// Establish a new instance of the CosmosClient to be used throughout this demo
|
||||
const client = new CosmosClient({ endpoint, auth: { masterKey } });
|
||||
|
||||
// We'll use the same pk value for all these samples
|
||||
const pk = "0";
|
||||
|
||||
function doesMatch(actual, expected) {
|
||||
for (let i = 0; i < actual.length; i++) {
|
||||
if (actual[i] !== expected[i]) {
|
||||
return "❌";
|
||||
}
|
||||
}
|
||||
return "✅";
|
||||
}
|
||||
|
||||
function logResult(scenario, actual, expected) {
|
||||
const status = doesMatch(actual, expected);
|
||||
console.log(` ${status} ${scenario} - expected: [${expected.join(", ")}] - actual: [${actual.join(", ")}]`);
|
||||
}
|
||||
|
||||
async function run() {
|
||||
const container = await init();
|
||||
|
||||
try {
|
||||
console.log(`
|
||||
✨✨✨ Change Feed Samples ✨✨✨
|
||||
|
||||
There are 4 scenarios for change feed:
|
||||
1. Start from a specific continuation
|
||||
2. Start from a specific point in time
|
||||
3. Start from the beginning
|
||||
4. Start from now
|
||||
|
||||
All 4 scenarios will eventually catch up to each other if read for long enough
|
||||
|
||||
In this sample, we expect the scenario to see the following items, by id:
|
||||
1. [3]
|
||||
2. [2, 3]
|
||||
3. [1, 2, 3]
|
||||
4. []
|
||||
|
||||
After we've read to this point, if we insert a new item id 4, we expect all of them to see it, since they will all be caught up.
|
||||
`);
|
||||
|
||||
console.log("📢 Phase 1: All scenarios see different results ");
|
||||
|
||||
await container.items.create({ id: "1", pk });
|
||||
console.log(" 👉 Inserted id=1");
|
||||
|
||||
const now = new Date();
|
||||
console.log(" 👉 Saved timestamp for the specific point in time scenario");
|
||||
const { headers } = await container.items.create({ id: "2", pk });
|
||||
const lsn = headers["lsn"];
|
||||
console.log(` 👉 Inserted id=2 after timestamp with LSN of ${lsn}`);
|
||||
|
||||
await container.items.create({ id: "3", pk });
|
||||
|
||||
console.log(` 👉 Inserted id=3`);
|
||||
|
||||
const specificContinuationIterator = container.items.readChangeFeed(pk, { continuation: lsn });
|
||||
const specificPointInTimeIterator = container.items.readChangeFeed(pk, { startTime: now });
|
||||
const fromBeginningIterator = container.items.readChangeFeed(pk, { startFromBeginning: true });
|
||||
const fromNowIterator = container.items.readChangeFeed(pk, {});
|
||||
|
||||
const { result: specificContinuationResult } = await specificContinuationIterator.executeNext();
|
||||
|
||||
logResult("initial specific Continuation scenario", [3], specificContinuationResult.map(v => parseInt(v.id)));
|
||||
|
||||
// First page is empty. It is catching up to a valid continuation.
|
||||
const { result: shouldBeEmpty } = await specificPointInTimeIterator.executeNext();
|
||||
logResult(
|
||||
"initial specific point in time scenario should be empty while it finds the right continuation",
|
||||
[],
|
||||
shouldBeEmpty.map(v => parseInt(v.id))
|
||||
);
|
||||
// Second page should have results
|
||||
const { result: specificPointInTimeResults } = await specificPointInTimeIterator.executeNext();
|
||||
logResult(
|
||||
"second specific point in time scenario should have caught up now",
|
||||
[2, 3],
|
||||
specificPointInTimeResults.map(v => parseInt(v.id))
|
||||
);
|
||||
|
||||
const { result: fromBeginningResults } = await fromBeginningIterator.executeNext();
|
||||
logResult("initial from beginning scenario", [1, 2, 3], fromBeginningResults.map(v => parseInt(v.id)));
|
||||
|
||||
const { result: fromNowResultsShouldBeEmpty } = await fromNowIterator.executeNext();
|
||||
logResult("initial from now scenario should be empty", [], fromNowResultsShouldBeEmpty.map(v => parseInt(v.id)));
|
||||
|
||||
// Now they should all be caught up to the point after id=3, so if we insert a id=4, they should all get it.
|
||||
console.log("📢 Phase 2: All scenarios are caught up and should see the same results");
|
||||
|
||||
await container.items.create({ id: "4", pk });
|
||||
console.log(" 👉 Inserting id=4 - all scenarios should see this");
|
||||
|
||||
const { result: specificContinuationResult2 } = await specificContinuationIterator.executeNext();
|
||||
logResult(
|
||||
"after insert, Specific Continuation scenario",
|
||||
[4],
|
||||
specificContinuationResult2.map(v => parseInt(v.id))
|
||||
);
|
||||
|
||||
const { result: specificPointInTimeResults2 } = await specificPointInTimeIterator.executeNext();
|
||||
logResult(
|
||||
"after insert, specific point in time scenario",
|
||||
[4],
|
||||
specificPointInTimeResults2.map(v => parseInt(v.id))
|
||||
);
|
||||
|
||||
const { result: fromBeginningResults2 } = await fromBeginningIterator.executeNext();
|
||||
logResult("after insert, from beginning scenario", [4], fromBeginningResults2.map(v => parseInt(v.id)));
|
||||
|
||||
const { result: fromNowResults2 } = await fromNowIterator.executeNext();
|
||||
logResult("after insert, from now scenario", [4], fromNowResults2.map(v => parseInt(v.id)));
|
||||
} catch (err) {
|
||||
handleError(err);
|
||||
} finally {
|
||||
await finish(container);
|
||||
}
|
||||
}
|
||||
|
||||
async function init() {
|
||||
const { database } = await client.databases.createIfNotExists({ id: databaseId });
|
||||
const { container } = await database.containers.createIfNotExists({
|
||||
id: containerId,
|
||||
partitionKey: { kind: "Hash", paths: ["/pk"] }
|
||||
});
|
||||
return container;
|
||||
}
|
||||
|
||||
async function handleError(error) {
|
||||
console.log("\nAn error with code '" + error.code + "' has occurred:");
|
||||
console.log("\t" + error);
|
||||
}
|
||||
|
||||
async function finish(container) {
|
||||
try {
|
||||
await container.database.delete();
|
||||
console.log("\nEnd of demo.");
|
||||
} catch (err) {
|
||||
console.log(`Database[${databaseId}] might not have deleted properly. You might need to delete it manually.`);
|
||||
}
|
||||
}
|
||||
|
||||
run().catch(handleError);
|
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"name": "cosmos-change-feed",
|
||||
"version": "0.0.0",
|
||||
"private": true,
|
||||
"description": "A sample showing usage of the change feed in Cosmos DB",
|
||||
"main": "app.js",
|
||||
"dependencies": {},
|
||||
"scripts": {
|
||||
"start": "node app.js"
|
||||
}
|
||||
}
|
|
@ -5,6 +5,10 @@
|
|||
"C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="
|
||||
};
|
||||
|
||||
if (exports.connection.endpoint.includes("https://localhost")) {
|
||||
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
|
||||
}
|
||||
|
||||
exports.names = {
|
||||
database: "NodeSamples",
|
||||
container: "Data"
|
||||
|
|
|
@ -0,0 +1,141 @@
|
|||
/// <reference lib="esnext.asynciterable" />
|
||||
import { isNumber } from "util";
|
||||
import { ChangeFeedOptions } from "./ChangeFeedOptions";
|
||||
import { ChangeFeedResponse } from "./ChangeFeedResponse";
|
||||
import { Resource } from "./client";
|
||||
import { ClientContext } from "./ClientContext";
|
||||
import { Constants, ResourceType, StatusCodes } from "./common";
|
||||
import { FeedOptions } from "./request";
|
||||
import { Response } from "./request";
|
||||
|
||||
/**
|
||||
* Provides iterator for change feed.
|
||||
*
|
||||
* Use `Items.readChangeFeed()` to get an instance of the iterator.
|
||||
*/
|
||||
export class ChangeFeedIterator<T> {
|
||||
private static readonly IfNoneMatchAllHeaderValue = "*";
|
||||
private nextIfNoneMatch: string;
|
||||
private ifModifiedSince: string;
|
||||
private lastStatusCode: number;
|
||||
private isPartitionSpecified: boolean;
|
||||
|
||||
/**
|
||||
* @internal
|
||||
* @hidden
|
||||
*
|
||||
* @param clientContext
|
||||
* @param resourceId
|
||||
* @param resourceLink
|
||||
* @param isPartitionedContainer
|
||||
* @param changeFeedOptions
|
||||
*/
|
||||
constructor(
|
||||
private clientContext: ClientContext,
|
||||
private resourceId: string,
|
||||
private resourceLink: string,
|
||||
private partitionKey: string | number | boolean,
|
||||
private isPartitionedContainer: () => Promise<boolean>,
|
||||
private changeFeedOptions: ChangeFeedOptions
|
||||
) {
|
||||
// partition key XOR partition key range id
|
||||
const partitionKeyValid = partitionKey !== undefined;
|
||||
this.isPartitionSpecified = partitionKeyValid;
|
||||
|
||||
let canUseStartFromBeginning = true;
|
||||
if (changeFeedOptions.continuation) {
|
||||
this.nextIfNoneMatch = changeFeedOptions.continuation;
|
||||
canUseStartFromBeginning = false;
|
||||
}
|
||||
|
||||
if (changeFeedOptions.startTime) {
|
||||
// .toUTCString() is platform specific, but most platforms use RFC 1123.
|
||||
// In ECMAScript 2018, this was standardized to RFC 1123.
|
||||
// See for more info: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date/toUTCString
|
||||
this.ifModifiedSince = changeFeedOptions.startTime.toUTCString();
|
||||
canUseStartFromBeginning = false;
|
||||
}
|
||||
|
||||
if (canUseStartFromBeginning && !changeFeedOptions.startFromBeginning) {
|
||||
this.nextIfNoneMatch = ChangeFeedIterator.IfNoneMatchAllHeaderValue;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a value indicating whether there are potentially additional results that can be retrieved.
|
||||
*
|
||||
* Initially returns true. This value is set based on whether the last execution returned a continuation token.
|
||||
*
|
||||
* @returns Boolean value representing if whether there are potentially additional results that can be retrieved.
|
||||
*/
|
||||
get hasMoreResults(): boolean {
|
||||
return this.lastStatusCode !== StatusCodes.NotModified;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets an async iterator which will yield pages of results from Azure Cosmos DB.
|
||||
*/
|
||||
public async *getAsyncIterator(): AsyncIterable<ChangeFeedResponse<Array<T & Resource>>> {
|
||||
while (this.hasMoreResults) {
|
||||
const result = await this.executeNext();
|
||||
yield result;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read feed and retrieves the next page of results in Azure Cosmos DB.
|
||||
*/
|
||||
public async executeNext(): Promise<ChangeFeedResponse<Array<T & Resource>>> {
|
||||
const response = await this.getFeedResponse();
|
||||
this.lastStatusCode = response.statusCode;
|
||||
this.nextIfNoneMatch = response.headers[Constants.HttpHeaders.ETag];
|
||||
return response;
|
||||
}
|
||||
|
||||
private async getFeedResponse(): Promise<ChangeFeedResponse<Array<T & Resource>>> {
|
||||
const isParittionedContainer = await this.isPartitionedContainer();
|
||||
if (!this.isPartitionSpecified && isParittionedContainer) {
|
||||
throw new Error("Container is partitioned, but no partition key or partition key range id was specified.");
|
||||
}
|
||||
const feedOptions: FeedOptions = { initialHeaders: {}, a_im: "Incremental feed" };
|
||||
|
||||
if (isNumber(this.changeFeedOptions.maxItemCount)) {
|
||||
feedOptions.maxItemCount = this.changeFeedOptions.maxItemCount;
|
||||
}
|
||||
|
||||
if (this.changeFeedOptions.sessionToken) {
|
||||
feedOptions.sessionToken = this.changeFeedOptions.sessionToken;
|
||||
}
|
||||
|
||||
if (this.nextIfNoneMatch) {
|
||||
feedOptions.accessCondition = {
|
||||
type: Constants.HttpHeaders.IfNoneMatch,
|
||||
condition: this.nextIfNoneMatch
|
||||
};
|
||||
}
|
||||
|
||||
if (this.ifModifiedSince) {
|
||||
feedOptions.initialHeaders[Constants.HttpHeaders.IfModifiedSince] = this.ifModifiedSince;
|
||||
}
|
||||
|
||||
if (this.partitionKey !== undefined) {
|
||||
feedOptions.partitionKey = this.partitionKey as any; // TODO: our partition key is too restrictive on the main object
|
||||
}
|
||||
|
||||
const response: Response<Array<T & Resource>> = await (this.clientContext.queryFeed<T>(
|
||||
this.resourceLink,
|
||||
ResourceType.item,
|
||||
this.resourceId,
|
||||
result => (result ? result.Documents : []),
|
||||
undefined,
|
||||
feedOptions
|
||||
) as Promise<any>); // TODO: some funky issues with query feed. Probably need to change it up.
|
||||
|
||||
return new ChangeFeedResponse(
|
||||
response.result,
|
||||
response.result ? response.result.length : 0,
|
||||
response.statusCode,
|
||||
response.headers
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/**
|
||||
* Specifies options for the change feed
|
||||
*
|
||||
* Some of these options control where and when to start reading from the change feed. The order of precedence is:
|
||||
* - continuation
|
||||
* - startTime
|
||||
* - startFromBeginning
|
||||
*
|
||||
* If none of those options are set, it will start reading changes from the first `ChangeFeedIterator.executeNext()` call.
|
||||
*/
|
||||
export interface ChangeFeedOptions {
|
||||
/**
|
||||
* Max amount of items to return per page
|
||||
*/
|
||||
maxItemCount?: number;
|
||||
/**
|
||||
* The continuation token to start from.
|
||||
*
|
||||
* This is equivalent to the etag and continuation value from the `ChangeFeedResponse`
|
||||
*/
|
||||
continuation?: string;
|
||||
/**
|
||||
* The session token to use. If not specified, will use the most recent captured session token to start with.
|
||||
*/
|
||||
sessionToken?: string;
|
||||
/**
|
||||
* Signals whether to start from the beginning or not.
|
||||
*/
|
||||
startFromBeginning?: boolean;
|
||||
/**
|
||||
* Specified the start time to start reading changes from.
|
||||
*/
|
||||
startTime?: Date;
|
||||
}
|
|
@ -0,0 +1,83 @@
|
|||
import { Constants } from "./common";
|
||||
import { IHeaders } from "./queryExecutionContext";
|
||||
|
||||
/**
|
||||
* A single response page from the Azure Cosmos DB Change Feed
|
||||
*/
|
||||
export class ChangeFeedResponse<T> {
|
||||
/**
|
||||
* @internal
|
||||
* @hidden
|
||||
*
|
||||
* @param result
|
||||
* @param count
|
||||
* @param statusCode
|
||||
* @param headers
|
||||
*/
|
||||
constructor(
|
||||
/**
|
||||
* Gets the items returned in the response from Azure Cosmos DB
|
||||
*/
|
||||
public readonly result: T,
|
||||
/**
|
||||
* Gets the number of items returned in the response from Azure Cosmos DB
|
||||
*/
|
||||
public readonly count: number,
|
||||
/**
|
||||
* Gets the status code of the response from Azure Cosmos DB
|
||||
*/
|
||||
public readonly statusCode: number,
|
||||
headers: IHeaders
|
||||
) {
|
||||
this.headers = Object.freeze(headers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the request charge for this request from the Azure Cosmos DB service.
|
||||
*/
|
||||
public get requestCharge(): number {
|
||||
const rus = this.headers[Constants.HttpHeaders.RequestCharge];
|
||||
return rus ? parseInt(rus, 10) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the activity ID for the request from the Azure Cosmos DB service.
|
||||
*/
|
||||
public get activityId(): string {
|
||||
return this.headers[Constants.HttpHeaders.ActivityId];
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the continuation token to be used for continuing enumeration of the Azure Cosmos DB service.
|
||||
*
|
||||
* This is equivalent to the `etag` property.
|
||||
*/
|
||||
public get continuation(): string {
|
||||
return this.etag;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the session token for use in session consistency reads from the Azure Cosmos DB service.
|
||||
*/
|
||||
public get sessionToken(): string {
|
||||
return this.headers[Constants.HttpHeaders.SessionToken];
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the entity tag associated with last transaction in the Azure Cosmos DB service,
|
||||
* which can be used as If-Non-Match Access condition for ReadFeed REST request or
|
||||
* `continuation` property of `ChangeFeedOptions` parameter for
|
||||
* `Items.readChangeFeed()`
|
||||
* to get feed changes since the transaction specified by this entity tag.
|
||||
*
|
||||
* This is equivalent to the `continuation` property.
|
||||
*/
|
||||
public get etag(): string {
|
||||
return this.headers[Constants.HttpHeaders.ETag];
|
||||
}
|
||||
|
||||
/**
|
||||
* Response headers of the response from Azure Cosmos DB
|
||||
*/
|
||||
public headers: IHeaders;
|
||||
}
|
|
@ -1,4 +1,5 @@
|
|||
import { Constants, CosmosClientOptions, IHeaders, QueryIterator, RequestOptions, Response, SqlQuerySpec } from ".";
|
||||
import { PartitionKeyRange } from "./client/Container/PartitionKeyRange";
|
||||
import { Resource } from "./client/Resource";
|
||||
import { Helper, StatusCodes, SubStatusCodes } from "./common";
|
||||
import { ConnectionPolicy, ConsistencyLevel, DatabaseAccount, QueryCompatibilityMode } from "./documents";
|
||||
|
@ -155,7 +156,7 @@ export class ClientContext {
|
|||
const cb: FetchFunctionCallback = innerOptions => {
|
||||
return this.queryFeed(path, "pkranges", id, result => result.PartitionKeyRanges, query, innerOptions);
|
||||
};
|
||||
return new QueryIterator(this, query, options, cb);
|
||||
return new QueryIterator<PartitionKeyRange>(this, query, options, cb);
|
||||
}
|
||||
|
||||
public async delete<T>(
|
||||
|
|
|
@ -2,7 +2,7 @@ import { PartitionKey } from "../..";
|
|||
import { ClientContext } from "../../ClientContext";
|
||||
import { Helper, UriFactory } from "../../common";
|
||||
import { PartitionKeyDefinition } from "../../documents";
|
||||
import { CosmosResponse, RequestOptions } from "../../request";
|
||||
import { CosmosResponse, FeedOptions, RequestOptions } from "../../request";
|
||||
import { Conflict, Conflicts } from "../Conflict";
|
||||
import { Database } from "../Database";
|
||||
import { Item, Items } from "../Item";
|
||||
|
@ -207,6 +207,11 @@ export class Container {
|
|||
};
|
||||
}
|
||||
|
||||
public readPartitionKeyRanges(feedOptions?: FeedOptions) {
|
||||
feedOptions = feedOptions || {};
|
||||
return this.clientContext.queryPartitionKeyRanges(this.url, undefined, feedOptions);
|
||||
}
|
||||
|
||||
// TODO: The ParitionKey type is REALLY weird. Now that it's being exported, we should clean it up.
|
||||
public extractPartitionKey(document: any, partitionKeyDefinition: PartitionKeyDefinition): PartitionKey[] {
|
||||
// TODO: any
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
export interface PartitionKeyRange {
|
||||
id: string;
|
||||
minInclusive: string;
|
||||
maxExclusive: string;
|
||||
ridPrefix: number;
|
||||
throughputFraction: number;
|
||||
status: string;
|
||||
parents: string[];
|
||||
}
|
|
@ -2,3 +2,4 @@ export { Container } from "./Container";
|
|||
export { Containers } from "./Containers";
|
||||
export { ContainerDefinition } from "./ContainerDefinition";
|
||||
export { ContainerResponse } from "./ContainerResponse";
|
||||
export { PartitionKeyRange } from "./PartitionKeyRange";
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
import { ChangeFeedIterator } from "../../ChangeFeedIterator";
|
||||
import { ChangeFeedOptions } from "../../ChangeFeedOptions";
|
||||
import { ClientContext } from "../../ClientContext";
|
||||
import { Helper } from "../../common";
|
||||
import { FetchFunctionCallback, SqlQuerySpec } from "../../queryExecutionContext";
|
||||
|
@ -9,6 +11,13 @@ import { Item } from "./Item";
|
|||
import { ItemDefinition } from "./ItemDefinition";
|
||||
import { ItemResponse } from "./ItemResponse";
|
||||
|
||||
import assert from "assert";
|
||||
import { isBoolean, isNumber, isString } from "util";
|
||||
|
||||
function isChangeFeedOptions(options: unknown): options is ChangeFeedOptions {
|
||||
return options && !(isString(options) || isBoolean(options) || isNumber(options));
|
||||
}
|
||||
|
||||
/**
|
||||
* Operations for creating new items, and reading/querying all items
|
||||
*
|
||||
|
@ -72,6 +81,77 @@ export class Items {
|
|||
return new QueryIterator(this.clientContext, query, options, fetchFunction, this.container.url);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a `ChangeFeedIterator` to iterate over pages of changes
|
||||
*
|
||||
* @param partitionKey
|
||||
* @param changeFeedOptions
|
||||
*
|
||||
* @example Read from the beginning of the change feed.
|
||||
* ```javascript
|
||||
* const iterator = items.readChangeFeed({ startFromBeginning: true });
|
||||
* const firstPage = await iterator.executeNext();
|
||||
* const firstPageResults = firstPage.result
|
||||
* const secondPage = await iterator.executeNext();
|
||||
* ```
|
||||
*/
|
||||
public readChangeFeed(
|
||||
partitionKey: string | number | boolean,
|
||||
changeFeedOptions: ChangeFeedOptions
|
||||
): ChangeFeedIterator<any>;
|
||||
/**
|
||||
* Create a `ChangeFeedIterator` to iterate over pages of changes
|
||||
*
|
||||
* @param changeFeedOptions
|
||||
*/
|
||||
public readChangeFeed(changeFeedOptions?: ChangeFeedOptions): ChangeFeedIterator<any>;
|
||||
/**
|
||||
* Create a `ChangeFeedIterator` to iterate over pages of changes
|
||||
*
|
||||
* @param partitionKey
|
||||
* @param changeFeedOptions
|
||||
*/
|
||||
public readChangeFeed<T>(
|
||||
partitionKey: string | number | boolean,
|
||||
changeFeedOptions: ChangeFeedOptions
|
||||
): ChangeFeedIterator<T>;
|
||||
/**
|
||||
* Create a `ChangeFeedIterator` to iterate over pages of changes
|
||||
*
|
||||
* @param changeFeedOptions
|
||||
*/
|
||||
public readChangeFeed<T>(changeFeedOptions?: ChangeFeedOptions): ChangeFeedIterator<T>;
|
||||
public readChangeFeed<T>(
|
||||
partitionKeyOrChangeFeedOptions?: string | number | boolean | ChangeFeedOptions,
|
||||
changeFeedOptions?: ChangeFeedOptions
|
||||
): ChangeFeedIterator<T> {
|
||||
let partitionKey: string | number | boolean;
|
||||
if (!changeFeedOptions && isChangeFeedOptions(partitionKeyOrChangeFeedOptions)) {
|
||||
partitionKey = undefined;
|
||||
changeFeedOptions = partitionKeyOrChangeFeedOptions;
|
||||
} else if (partitionKeyOrChangeFeedOptions !== undefined && !isChangeFeedOptions(partitionKeyOrChangeFeedOptions)) {
|
||||
partitionKey = partitionKeyOrChangeFeedOptions;
|
||||
}
|
||||
|
||||
if (!changeFeedOptions) {
|
||||
throw new Error("changeFeedOptions must be a valid object");
|
||||
}
|
||||
|
||||
const path = Helper.getPathFromLink(this.container.url, "docs");
|
||||
const id = Helper.getIdFromLink(this.container.url);
|
||||
return new ChangeFeedIterator<T>(
|
||||
this.clientContext,
|
||||
id,
|
||||
path,
|
||||
partitionKey,
|
||||
async () => {
|
||||
const bodyWillBeTruthyIfPartitioned = (await this.container.getPartitionKeyDefinition()).body;
|
||||
return !!bodyWillBeTruthyIfPartitioned;
|
||||
},
|
||||
changeFeedOptions
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read all items.
|
||||
*
|
||||
|
|
|
@ -78,6 +78,7 @@ export const Constants = {
|
|||
// Our custom Azure Cosmos DB headers
|
||||
Continuation: "x-ms-continuation",
|
||||
PageSize: "x-ms-max-item-count",
|
||||
ItemCount: "x-ms-item-count",
|
||||
|
||||
// Request sender generated. Simply echoed by backend.
|
||||
ActivityId: "x-ms-activity-id",
|
||||
|
|
|
@ -3,4 +3,5 @@ import { IHeaders } from "..";
|
|||
export interface Response<T> {
|
||||
headers?: IHeaders;
|
||||
result?: T;
|
||||
statusCode?: number;
|
||||
}
|
||||
|
|
|
@ -96,7 +96,7 @@ export function createRequestObject(
|
|||
return reject(exception);
|
||||
}
|
||||
|
||||
resolve({ result, headers: response.headers as IHeaders });
|
||||
resolve({ result, headers: response.headers as IHeaders, statusCode: response.statusCode });
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -48,6 +48,9 @@ describe("NodeJS CRUD Tests", function() {
|
|||
const { result: results } = await database.containers.query(querySpec).toArray();
|
||||
assert(results.length > 0, "number of results for the query should be > 0");
|
||||
|
||||
const { result: ranges } = await container.readPartitionKeyRanges().toArray();
|
||||
assert(ranges.length > 0, "container should have at least 1 partition");
|
||||
|
||||
// Replacing indexing policy is allowed.
|
||||
containerDef.indexingPolicy.indexingMode = IndexingMode.lazy;
|
||||
const { body: replacedContainer } = await container.replace(containerDef);
|
||||
|
|
|
@ -1,106 +1,348 @@
|
|||
import assert from "assert";
|
||||
import { FeedOptions } from "../..";
|
||||
import { Container } from "../../client";
|
||||
import { RequestOptions } from "../..";
|
||||
import { Container, ContainerDefinition } from "../../client";
|
||||
import { Helper } from "../../common";
|
||||
import { getTestContainer, removeAllDatabases } from "../common/TestHelpers";
|
||||
|
||||
describe("NodeJS Incremental Feed Tests using 'a_im' and 'IfNoneMatch' options", function() {
|
||||
// delete all databases and create sample database
|
||||
before(async function() {
|
||||
await removeAllDatabases();
|
||||
});
|
||||
function hasDupeKey(items: any[]) {
|
||||
if (items && items.length === 0) {
|
||||
return false;
|
||||
}
|
||||
const key = items[0].key;
|
||||
let hasDupe = false;
|
||||
for (const item of items) {
|
||||
if (item.key !== key) {
|
||||
hasDupe = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return hasDupe;
|
||||
}
|
||||
|
||||
describe("Newly updated documents should be fetched incremetally", function() {
|
||||
let container: Container;
|
||||
describe("Change Feed Iterator", function() {
|
||||
this.timeout(process.env.MOCHA_TIMEOUT || 20000);
|
||||
|
||||
// create container and two documents
|
||||
describe("Non-partitioned", function() {
|
||||
// delete all databases and create sample database
|
||||
before(async function() {
|
||||
container = await getTestContainer("Newly updated documents should be fetched incrementally");
|
||||
await container.items.create({ id: "doc1" });
|
||||
await container.items.create({ id: "doc2" });
|
||||
await removeAllDatabases();
|
||||
});
|
||||
|
||||
after(async function() {
|
||||
await container.delete();
|
||||
});
|
||||
describe("Should only find items after start time", function() {
|
||||
let container: Container;
|
||||
|
||||
it("should fetch updated documents only", async function() {
|
||||
let options: FeedOptions = { a_im: "Incremental feed" };
|
||||
const query = container.items.readAll(options);
|
||||
|
||||
const { result: document, headers } = await query.current();
|
||||
assert(headers.etag, "listDocuments response should have etag header");
|
||||
|
||||
const { result: results } = await query.toArray();
|
||||
assert.equal(results.length, 2, "initial number of documents should be equal 2");
|
||||
|
||||
document.name = "xyz";
|
||||
|
||||
const { body: replaced } = await container.item(document.id).replace(document);
|
||||
assert.deepEqual(replaced.name, "xyz", "replaced document should be valid");
|
||||
|
||||
options = {
|
||||
a_im: "Incremental feed",
|
||||
accessCondition: {
|
||||
type: "IfNoneMatch",
|
||||
condition: headers.etag
|
||||
}
|
||||
};
|
||||
const { result: docs } = await container.items.readAll(options).toArray();
|
||||
assert.equal(docs.length, 1, "initial number of documents should be equal 1");
|
||||
assert.equal(docs[0].name, "xyz", "fetched document should have 'name: xyz'");
|
||||
assert.equal(docs[0].id, document.id, "fetched document should be valid");
|
||||
});
|
||||
});
|
||||
|
||||
describe("Newly created documents should be fetched incrementally", async function() {
|
||||
let container: Container;
|
||||
|
||||
// create container and one document
|
||||
before(async function() {
|
||||
container = await getTestContainer("Newly updated documents should be fetched incrementally");
|
||||
await container.items.create({ id: "doc1" });
|
||||
});
|
||||
|
||||
after(async function() {
|
||||
await container.delete();
|
||||
});
|
||||
|
||||
it("should fetch new documents only", async function() {
|
||||
let options: FeedOptions = { a_im: "Incremental feed" };
|
||||
let query = container.items.readAll(options);
|
||||
|
||||
let { result, headers } = await query.current();
|
||||
assert(headers.etag, "listDocuments response should have etag header");
|
||||
|
||||
const { body: document } = await container.items.create({
|
||||
id: "doc2",
|
||||
prop: 1
|
||||
// create container and two items
|
||||
before(async function() {
|
||||
container = await getTestContainer("Newly updated items should be fetched incrementally");
|
||||
});
|
||||
|
||||
options = {
|
||||
a_im: "Incremental feed",
|
||||
accessCondition: {
|
||||
type: "IfNoneMatch",
|
||||
condition: headers.etag
|
||||
after(async function() {
|
||||
await container.delete();
|
||||
});
|
||||
|
||||
it("should fetch updated items only with start time", async function() {
|
||||
await container.items.create({ id: "item1" });
|
||||
const date = new Date();
|
||||
await Helper.sleep(3000);
|
||||
await container.items.create({ id: "item2" });
|
||||
const iterator = container.items.readChangeFeed({ startTime: date });
|
||||
|
||||
const { result: itemsShouldBeEmpty, etag: initialEtag } = await iterator.executeNext();
|
||||
|
||||
assert(initialEtag, "change feed response should have etag header");
|
||||
const etag = initialEtag;
|
||||
|
||||
assert.equal(itemsShouldBeEmpty.length, 0, "Initial request should have empty results");
|
||||
|
||||
const { result: items } = await iterator.executeNext();
|
||||
|
||||
assert.equal(items.length, 1, "initial number of items should be equal 1");
|
||||
assert.equal(items[0].id, "item2", "should find the newest item, but not the old");
|
||||
const item = { id: "item2", name: "xyz" };
|
||||
|
||||
const { body: replaced } = await container.item(item.id).replace(item);
|
||||
assert.deepEqual(replaced.name, "xyz", "replaced item should be valid");
|
||||
|
||||
// Should continue from last etag
|
||||
const { result: itemsAfterUpdate } = await iterator.executeNext();
|
||||
assert.equal(itemsAfterUpdate.length, 1, "initial number of items should be equal 1");
|
||||
assert.equal(itemsAfterUpdate[0].name, "xyz", "fetched item should have 'name: xyz'");
|
||||
assert.equal(itemsAfterUpdate[0].id, item.id, "fetched item should be valid");
|
||||
|
||||
// Equivalent to execute next on other iterator from the previous etag
|
||||
const iteratorWithContinuation = container.items.readChangeFeed({ continuation: etag });
|
||||
const { result: itemsWithContinuation } = await iteratorWithContinuation.executeNext();
|
||||
assert.equal(itemsWithContinuation.length, 1, "initial number of items should be equal 1");
|
||||
assert.equal(itemsWithContinuation[0].name, "xyz", "fetched item should have 'name: xyz'");
|
||||
assert.equal(itemsWithContinuation[0].id, item.id, "fetched item should be valid");
|
||||
});
|
||||
});
|
||||
|
||||
describe("Newly updated items should be fetched incrementally", function() {
|
||||
let container: Container;
|
||||
|
||||
// create container and two items
|
||||
before(async function() {
|
||||
container = await getTestContainer("Newly updated items should be fetched incrementally");
|
||||
await container.items.create({ id: "item1" });
|
||||
await container.items.create({ id: "item2" });
|
||||
});
|
||||
|
||||
after(async function() {
|
||||
await container.delete();
|
||||
});
|
||||
|
||||
it("should fetch updated items only", async function() {
|
||||
const iterator = container.items.readChangeFeed({ startFromBeginning: true });
|
||||
|
||||
const { result: items, headers } = await iterator.executeNext();
|
||||
assert(headers.etag, "change feed response should have etag header");
|
||||
const etag = headers.etag;
|
||||
|
||||
assert.equal(items.length, 2, "initial number of items should be equal 2");
|
||||
|
||||
const item = items[1];
|
||||
item.name = "xyz";
|
||||
|
||||
const { body: replaced } = await container.item(item.id).replace(item);
|
||||
assert.deepEqual(replaced.name, "xyz", "replaced item should be valid");
|
||||
|
||||
// Should continue from last etag
|
||||
const { result: itemsAfterUpdate } = await iterator.executeNext();
|
||||
assert.equal(itemsAfterUpdate.length, 1, "initial number of items should be equal 1");
|
||||
assert.equal(itemsAfterUpdate[0].name, "xyz", "fetched item should have 'name: xyz'");
|
||||
assert.equal(itemsAfterUpdate[0].id, item.id, "fetched item should be valid");
|
||||
|
||||
// Equivalent to execute next on other iterator from the previous etag
|
||||
const iteratorWithContinuation = container.items.readChangeFeed({ continuation: etag });
|
||||
const { result: itemsWithContinuation } = await iteratorWithContinuation.executeNext();
|
||||
assert.equal(itemsWithContinuation.length, 1, "initial number of items should be equal 1");
|
||||
assert.equal(itemsWithContinuation[0].name, "xyz", "fetched item should have 'name: xyz'");
|
||||
assert.equal(itemsWithContinuation[0].id, item.id, "fetched item should be valid");
|
||||
});
|
||||
});
|
||||
|
||||
describe("Async iterator should find items", function() {
|
||||
let container: Container;
|
||||
|
||||
// create container and two items
|
||||
before(async function() {
|
||||
container = await getTestContainer("Newly updated items should be fetched incrementally");
|
||||
await container.items.create({ id: "item1" });
|
||||
await container.items.create({ id: "item2" });
|
||||
});
|
||||
|
||||
after(async function() {
|
||||
await container.delete();
|
||||
});
|
||||
|
||||
it("should fetch updated items only", async function() {
|
||||
const iterator = container.items.readChangeFeed({ startFromBeginning: true });
|
||||
|
||||
const items: any[] = [];
|
||||
for await (const page of iterator.getAsyncIterator()) {
|
||||
if (page.result.length === 0) {
|
||||
break;
|
||||
}
|
||||
items.push(...page.result);
|
||||
}
|
||||
};
|
||||
query = await container.items.readAll(options);
|
||||
({ result, headers } = await query.current());
|
||||
|
||||
assert.notDeepEqual(result, document, "actual should not match with expected value.");
|
||||
delete result._lsn;
|
||||
delete result._metadata;
|
||||
assert.deepEqual(result, document, "actual value doesn't match with expected value.");
|
||||
assert.equal(items.length, 2, "initial number of items should be equal 2");
|
||||
|
||||
options.accessCondition.condition = headers.etag;
|
||||
const item = items[1];
|
||||
item.name = "xyz";
|
||||
|
||||
const { result: results } = await container.items.readAll(options).toArray();
|
||||
assert.equal(results.length, 0, "should be nothing new");
|
||||
const { body: replaced } = await container.item(item.id).replace(item);
|
||||
assert.deepEqual(replaced.name, "xyz", "replaced item should be valid");
|
||||
|
||||
await container.items.create({ id: "doc3" });
|
||||
await container.items.create({ id: "doc4" });
|
||||
const { result: docs } = await container.items.readAll(options).toArray();
|
||||
assert.equal(docs.length, 2, "there should be 2 results");
|
||||
// Should continue from last etag
|
||||
const itemsAfterUpdate: any[] = [];
|
||||
for await (const page of iterator.getAsyncIterator()) {
|
||||
if (page.result.length === 0) {
|
||||
break;
|
||||
}
|
||||
itemsAfterUpdate.push(...page.result);
|
||||
}
|
||||
assert.equal(itemsAfterUpdate.length, 1, "initial number of items should be equal 1");
|
||||
assert.equal(itemsAfterUpdate[0].name, "xyz", "fetched item should have 'name: xyz'");
|
||||
assert.equal(itemsAfterUpdate[0].id, item.id, "fetched item should be valid");
|
||||
});
|
||||
});
|
||||
|
||||
describe("Newly created items should be fetched incrementally", async function() {
|
||||
let container: Container;
|
||||
|
||||
// create container and one item
|
||||
before(async function() {
|
||||
container = await getTestContainer("Newly updated items should be fetched incrementally");
|
||||
await container.items.create({ id: "item1" });
|
||||
});
|
||||
|
||||
after(async function() {
|
||||
await container.delete();
|
||||
});
|
||||
|
||||
it("should fetch new items only", async function() {
|
||||
const iterator = container.items.readChangeFeed({});
|
||||
|
||||
const { result: items, headers } = await iterator.executeNext();
|
||||
assert(headers.etag, "change feed response should have etag header");
|
||||
assert.equal(items.length, 0, "change feed response should have no items on it initially");
|
||||
|
||||
const { body: itemThatWasCreated } = await container.items.create({
|
||||
id: "item2",
|
||||
prop: 1
|
||||
});
|
||||
|
||||
const { result: itemsAfterCreate } = await iterator.executeNext();
|
||||
assert.equal(itemsAfterCreate.length, 1, "should have 1 item from create");
|
||||
const itemThatWasFound = itemsAfterCreate[0];
|
||||
|
||||
assert.notDeepEqual(itemThatWasFound, itemThatWasCreated, "actual should not match with expected value.");
|
||||
delete itemThatWasFound._lsn;
|
||||
delete itemThatWasFound._metadata;
|
||||
assert.deepEqual(itemThatWasFound, itemThatWasCreated, "actual value doesn't match with expected value.");
|
||||
|
||||
const { result: itemsShouldBeEmptyWithNoNewCreates } = await iterator.executeNext();
|
||||
assert.equal(itemsShouldBeEmptyWithNoNewCreates.length, 0, "should be nothing new");
|
||||
|
||||
await container.items.create({ id: "item3" });
|
||||
await container.items.create({ id: "item4" });
|
||||
const { result: itemsShouldHave2NewItems } = await iterator.executeNext();
|
||||
assert.equal(itemsShouldHave2NewItems.length, 2, "there should be 2 results");
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("Partition Key", function() {
|
||||
// delete all databases and create sample database
|
||||
before(async function() {
|
||||
await removeAllDatabases();
|
||||
});
|
||||
|
||||
describe("Newly updated items should be fetched incrementally", function() {
|
||||
let container: Container;
|
||||
|
||||
// create container and two items
|
||||
before(async function() {
|
||||
const containerDef: ContainerDefinition = {
|
||||
partitionKey: {
|
||||
kind: "Hash",
|
||||
paths: ["/key"]
|
||||
}
|
||||
};
|
||||
const throughput: RequestOptions = { offerThroughput: 25100 };
|
||||
container = await getTestContainer(
|
||||
"Newly updated items should be fetched incrementally",
|
||||
undefined,
|
||||
containerDef,
|
||||
throughput
|
||||
);
|
||||
await container.items.create({ id: "item1", key: "0" });
|
||||
await container.items.create({ id: "item2", key: "0" });
|
||||
await container.items.create({ id: "item1", key: "1" });
|
||||
await container.items.create({ id: "item2", key: "1" });
|
||||
});
|
||||
|
||||
after(async function() {
|
||||
await container.delete();
|
||||
});
|
||||
|
||||
it("should throw if used with no partition key or partition key range id", async function() {
|
||||
const iterator = container.items.readChangeFeed({ startFromBeginning: true });
|
||||
|
||||
try {
|
||||
await iterator.executeNext();
|
||||
} catch (err) {
|
||||
assert.equal(
|
||||
err.message,
|
||||
"Container is partitioned, but no partition key or partition key range id was specified."
|
||||
);
|
||||
return;
|
||||
}
|
||||
assert.fail("Should have failed");
|
||||
});
|
||||
|
||||
it("should fetch updated items only", async function() {
|
||||
const iterator = container.items.readChangeFeed("0", { startFromBeginning: true });
|
||||
|
||||
const { result: items, headers } = await iterator.executeNext();
|
||||
assert(headers.etag, "change feed response should have etag header");
|
||||
|
||||
assert.equal(items.length, 2, "initial number of items should be equal 2");
|
||||
|
||||
const item = items[1];
|
||||
item.name = "xyz";
|
||||
|
||||
const { body: replaced } = await container.item(item.id).replace(item);
|
||||
assert.deepEqual(replaced.name, "xyz", "replaced item should be valid");
|
||||
|
||||
const { result: itemsAfterUpdate } = await iterator.executeNext();
|
||||
assert.equal(itemsAfterUpdate.length, 1, "initial number of items should be equal 1");
|
||||
assert.equal(itemsAfterUpdate[0].name, "xyz", "fetched item should have 'name: xyz'");
|
||||
assert.equal(itemsAfterUpdate[0].id, item.id, "fetched item should be valid");
|
||||
});
|
||||
});
|
||||
|
||||
describe("Newly created items should be fetched incrementally", async function() {
|
||||
let container: Container;
|
||||
|
||||
// create container and one item
|
||||
before(async function() {
|
||||
const containerDef: ContainerDefinition = {
|
||||
partitionKey: {
|
||||
kind: "Hash",
|
||||
paths: ["/key"]
|
||||
}
|
||||
};
|
||||
const throughput: RequestOptions = { offerThroughput: 25100 };
|
||||
container = await getTestContainer(
|
||||
"Newly updated items should be fetched incrementally",
|
||||
undefined,
|
||||
containerDef,
|
||||
throughput
|
||||
);
|
||||
await container.items.create({ id: "item1", key: "0" });
|
||||
await container.items.create({ id: "item1", key: "1" });
|
||||
});
|
||||
|
||||
after(async function() {
|
||||
await container.delete();
|
||||
});
|
||||
|
||||
it("should fetch new items only", async function() {
|
||||
const iterator = container.items.readChangeFeed("0", {});
|
||||
|
||||
const { result: items, headers } = await iterator.executeNext();
|
||||
assert(headers.etag, "change feed response should have etag header");
|
||||
assert.equal(items.length, 0, "change feed response should have no items on it initially");
|
||||
|
||||
const { body: itemThatWasCreated, headers: createHeaders } = await container.items.create({
|
||||
id: "item2",
|
||||
prop: 1,
|
||||
key: "0"
|
||||
});
|
||||
console.log(`createHeaders: ${createHeaders}`);
|
||||
|
||||
const { result: itemsAfterCreate } = await iterator.executeNext();
|
||||
assert.equal(itemsAfterCreate.length, 1, "should have 1 item from create");
|
||||
const itemThatWasFound = itemsAfterCreate[0];
|
||||
|
||||
assert.notDeepEqual(itemThatWasFound, itemThatWasCreated, "actual should not match with expected value.");
|
||||
delete itemThatWasFound._lsn;
|
||||
delete itemThatWasFound._metadata;
|
||||
assert.deepEqual(itemThatWasFound, itemThatWasCreated, "actual value doesn't match with expected value.");
|
||||
|
||||
const { result: itemsShouldBeEmptyWithNoNewCreates } = await iterator.executeNext();
|
||||
assert.equal(itemsShouldBeEmptyWithNoNewCreates.length, 0, "should be nothing new");
|
||||
|
||||
await container.items.create({ id: "item3", key: "0" });
|
||||
await container.items.create({ id: "item4", key: "0" });
|
||||
await container.items.create({ id: "item3", key: "1" });
|
||||
await container.items.create({ id: "item4", key: "1" });
|
||||
const { result: itemsShouldHave2NewItems } = await iterator.executeNext();
|
||||
assert.equal(itemsShouldHave2NewItems.length, 2, "there should be 2 results");
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -47,7 +47,7 @@ describe("Session Token", function() {
|
|||
await removeAllDatabases();
|
||||
});
|
||||
|
||||
it("validate session tokens for sequence of opearations", async function() {
|
||||
it("validate session tokens for sequence of operations", async function() {
|
||||
const database = await getTestDatabase("session test", client);
|
||||
|
||||
const { body: createdContainerDef } = await database.containers.create(containerDefinition, containerOptions);
|
||||
|
@ -267,13 +267,14 @@ describe("Session Token", function() {
|
|||
});
|
||||
|
||||
it("validate 'lsn not caught up' error for higher lsn and clearing session token", async function() {
|
||||
this.retries(2);
|
||||
const database = await getTestDatabase("session test", client);
|
||||
|
||||
const containerLink = "dbs/" + database.id + "/colls/" + containerId;
|
||||
const increaseLSN = function(oldTokens: Map<string, Map<string, VectorSessionToken>>) {
|
||||
for (const [coll, tokens] of oldTokens.entries()) {
|
||||
for (const [pk, token] of tokens.entries()) {
|
||||
(token as any).globalLsn = (token as any).version + 200;
|
||||
(token as any).globalLsn = (token as any).globalLsn + 200;
|
||||
const newToken = token.merge(token);
|
||||
return `0:${newToken.toString()}`;
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче