azure-sdk-for-js/sdk/cosmosdb/cosmos/samples-dev/ChangeFeed.ts

172 строки
5.7 KiB
TypeScript

// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
/**
* @summary Demonstrates using a ChangeFeed.
*/
import * as dotenv from "dotenv";
dotenv.config();
import { finish, handleError, logSampleHeader } from "./Shared/handleError";
import { CosmosClient } from "@azure/cosmos";
const key = process.env.COSMOS_KEY || "<cosmos key>";
const endpoint = process.env.COSMOS_ENDPOINT || "<cosmos endpoint>";
const databaseId = process.env.COSMOS_DATABASE || "<cosmos database>";
const containerId = process.env.COSMOS_CONTAINER || "<cosmos container>";
logSampleHeader("Change Feed");
// Establish a new instance of the CosmosClient to be used throughout this demo
const client = new CosmosClient({ endpoint, key });
// We'll use the same pk value for all these samples
const pk = "0";
function doesMatch(actual: any[], expected: any[]): string {
for (let i = 0; i < actual.length; i++) {
if (actual[i] !== expected[i]) {
return "❌";
}
}
return "✅";
}
function logResult(scenario: string, actual: any[], expected: any[]): void {
const status = doesMatch(actual, expected);
console.log(
` ${status} ${scenario} - expected: [${expected.join(", ")}] - actual: [${actual.join(", ")}]`,
);
}
async function run(): Promise<void> {
const { database } = await client.databases.createIfNotExists({ id: databaseId });
const { container } = await database.containers.createIfNotExists({
id: containerId,
partitionKey: { paths: ["/pk"] },
});
try {
console.log(`
✨✨✨ Change Feed Samples ✨✨✨
There are 4 scenarios for change feed:
1. Start from a specific continuation
2. Start from a specific point in time
3. Start from the beginning
4. Start from now
All 4 scenarios will eventually catch up to each other if read for long enough
In this sample, we expect the scenario to see the following items, by id:
1. [3]
2. [2, 3]
3. [1, 2, 3]
4. []
After we've read to this point, if we insert a new item id 4, we expect all of them to see it, since they will all be caught up.
`);
console.log("📢 Phase 1: All scenarios see different results ");
await container.items.create({ id: "1", pk });
console.log(" 👉 Inserted id=1");
const now = new Date();
console.log(" 👉 Saved timestamp for the specific point in time scenario");
const { headers } = await container.items.create({ id: "2", pk });
const lsn = headers["lsn"];
console.log(` 👉 Inserted id=2 after timestamp with LSN of ${lsn}`);
await container.items.create({ id: "3", pk });
console.log(` 👉 Inserted id=3`);
const specificContinuationIterator = container.items.changeFeed(pk, {
continuation: lsn.toString(),
});
const specificPointInTimeIterator = container.items.changeFeed(pk, { startTime: now });
const fromBeginningIterator = container.items.changeFeed(pk, { startFromBeginning: true });
const fromNowIterator = container.items.changeFeed(pk, {});
const { result: specificContinuationResult } = await specificContinuationIterator.fetchNext();
logResult(
"initial specific Continuation scenario",
[3],
specificContinuationResult.map((v) => parseInt(v.id)),
);
// First page is empty. It is catching up to a valid continuation.
const { result: shouldBeEmpty } = await specificPointInTimeIterator.fetchNext();
logResult(
"initial specific point in time scenario should be empty while it finds the right continuation",
[],
shouldBeEmpty.map((v) => parseInt(v.id)),
);
// Second page should have results
const { result: specificPointInTimeResults } = await specificPointInTimeIterator.fetchNext();
logResult(
"second specific point in time scenario should have caught up now",
[2, 3],
specificPointInTimeResults.map((v) => parseInt(v.id)),
);
const { result: fromBeginningResults } = await fromBeginningIterator.fetchNext();
logResult(
"initial from beginning scenario",
[1, 2, 3],
fromBeginningResults.map((v) => parseInt(v.id)),
);
const { result: fromNowResultsShouldBeEmpty } = await fromNowIterator.fetchNext();
logResult(
"initial from now scenario should be empty",
[],
fromNowResultsShouldBeEmpty.map((v) => parseInt(v.id)),
);
// Now they should all be caught up to the point after id=3, so if we insert a id=4, they should all get it.
console.log("📢 Phase 2: All scenarios are caught up and should see the same results");
await container.items.create({ id: "4", pk });
console.log(" 👉 Inserting id=4 - all scenarios should see this");
const { result: specificContinuationResult2 } = await specificContinuationIterator.fetchNext();
logResult(
"after insert, Specific Continuation scenario",
[4],
specificContinuationResult2.map((v) => parseInt(v.id)),
);
const { result: specificPointInTimeResults2 } = await specificPointInTimeIterator.fetchNext();
logResult(
"after insert, specific point in time scenario",
[4],
specificPointInTimeResults2.map((v) => parseInt(v.id)),
);
const { result: fromBeginningResults2 } = await fromBeginningIterator.fetchNext();
logResult(
"after insert, from beginning scenario",
[4],
fromBeginningResults2.map((v) => parseInt(v.id)),
);
const { result: fromNowResults2 } = await fromNowIterator.fetchNext();
logResult(
"after insert, from now scenario",
[4],
fromNowResults2.map((v) => parseInt(v.id)),
);
} catch (err: any) {
if (err) {
console.log("Threw, as expected");
} else {
throw err;
}
} finally {
await finish();
}
}
run().catch(handleError);