Implementation and tests for TableCheckpointStore (#16685)

* implementation of tablecheckpointsore

* renamed file to .spec.ts

* updated tablecheckpointstore implementation

* updated tests for listownerships

* changed return to print in listOwnerships

* removed unnecessary comments

* formatted files

* updated tests

* updated claim ownership

* updated tablecheckpointstore

* updated 'updateCheckpoints'

* updated updatecheckpoints

* updated updatecheckpointstore

* wrote tests for updatecheckpoint

* update tablechcekpointstore

* updated tablecheckpointstore

* implemented claimowenership

* added nyc to testnode

* replaced export

* deleted temp folder from common

* run rush update

* updated tableCheckpointstore

* fixed testUtils

* changed let to const

* run npm format

* renamed customCheckpoint

* implemented updateCheckpoint

* changed format for partitionKey

* updated claimownership

* changed checkpoint entity format

* updated claimownership

* implemented tests for claimOwnership

* wrote tests for claimOwnership

* implemented claimownership

* removed space

* updated table version

* added more tests for claimOwnership

* added tests to claimownership

* run rushx format

* changed date to timetsamp

* foramtted files

* Update sdk/eventhub/eventhubs-checkpointstore-table/src/tableCheckpointStore.ts

Co-authored-by: chradek <51000525+chradek@users.noreply.github.com>

* updated implementation

* corrected files

* updated tests

* updated test

* added tests

* updated tests

* formatted files

* mutated ownership list

* updated tests

* Update sdk/eventhub/eventhubs-checkpointstore-table/src/tableCheckpointStore.ts

Co-authored-by: chradek <51000525+chradek@users.noreply.github.com>

* Update sdk/eventhub/eventhubs-checkpointstore-table/src/tableCheckpointStore.ts

Co-authored-by: chradek <51000525+chradek@users.noreply.github.com>

* removed extra space

* formatted files

* [eventhubs-checkpointstore-table] add STORAGE_ACCOUNT_NAME and STORAGE_ACCOUNT_KEY outputs to test-resources for running live tests

* updated tests

* format

* added sample.env file

* updated tests

* updated tests

* reverted changes

Co-authored-by: t-bntiamoah@microsoft.com <t-bntiamoah@microsoft.com>
Co-authored-by: chradek <51000525+chradek@users.noreply.github.com>
Co-authored-by: Christopher Radek <chradek@microsoft.com>
This commit is contained in:
baffyntiamoah 2021-08-11 12:23:11 -04:00 коммит произвёл GitHub
Родитель d87c7d513c
Коммит e22c8993b5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
9 изменённых файлов: 789 добавлений и 46 удалений

Просмотреть файл

@ -446,6 +446,22 @@ packages:
node: '>=8.0.0'
resolution:
integrity: sha512-7d2w0yd8pb1c9aj87JV/1ntOp+sCMcJ9QoGDxs6/7BLDh8Gb6kd2h3n+9JYhcLZO8wdHZb4d4GZgmRIwaAU72w==
/@azure/data-tables/12.1.0:
dependencies:
'@azure/core-auth': 1.3.2
'@azure/core-client': 1.2.2
'@azure/core-paging': 1.1.3
'@azure/core-rest-pipeline': 1.1.1
'@azure/core-tracing': 1.0.0-preview.12
'@azure/core-xml': 1.0.0-beta.1
'@azure/logger': 1.0.2
tslib: 2.3.0
uuid: 8.3.2
dev: false
engines:
node: '>=12.0.0'
resolution:
integrity: sha512-Ds6vgsxD05xlEfAPVhCKxSaWHPsb9ZJJHg93J+e60zdAhWcbzN8RUUzIbPFkHZ/KWNBS9wMP1n5efKZuGoekGA==
/@azure/event-hubs/2.1.4:
dependencies:
'@azure/amqp-common': 1.0.0-preview.9
@ -10329,6 +10345,7 @@ packages:
version: 0.0.0
file:projects/eventhubs-checkpointstore-table.tgz:
dependencies:
'@azure/data-tables': 12.1.0
'@microsoft/api-extractor': 7.7.11
'@rollup/plugin-commonjs': 11.0.2_rollup@1.32.1
'@rollup/plugin-inject': 4.0.2_rollup@1.32.1
@ -10349,7 +10366,7 @@ packages:
cross-env: 7.0.3
debug: 4.3.2
dotenv: 8.6.0
eslint: 7.31.0
eslint: 7.30.0
esm: 3.2.25
guid-typescript: 1.0.9
inherits: 2.0.4
@ -10382,7 +10399,7 @@ packages:
dev: false
name: '@rush-temp/eventhubs-checkpointstore-table'
resolution:
integrity: sha512-dJ4D33ESkzcF7QKaIAcjx/MBJAimgjA1ANpFM6yqLB/eawi6OCYS0B0ajj49eqQgq22poyfRjcWwO/2+pbPOmA==
integrity: sha512-xxW8Ya855QLqd+Oevmi21IetihZTV+gjZkNYwlrEHCnig0jy+O3JaJMYplRLULBCQPwIFhdyQBo5jLRBp79y8A==
tarball: file:projects/eventhubs-checkpointstore-table.tgz
version: 0.0.0
file:projects/identity-cache-persistence.tgz:

Просмотреть файл

@ -45,14 +45,14 @@
"extract-api": "tsc -p . && api-extractor run --local",
"format": "prettier --write --config ../../../.prettierrc.json --ignore-path ../../../.prettierignore \"src/**/*.ts\" \"test/**/*.ts\" \"*.{js,json}\"",
"integration-test:browser": "echo skipped",
"integration-test:node": "nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 1200000 --full-trace \"dist-esm/test/*.spec.js\"",
"integration-test:node": "nyc mocha -r esm --require ts-node/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 1200000 --full-trace \"test/{,!(browser)/**/}*.spec.ts\"",
"integration-test": "npm run integration-test:node && npm run integration-test:browser",
"lint:fix": "eslint package.json api-extractor.json src test --ext .ts --fix",
"lint": "eslint package.json api-extractor.json src test --ext .ts",
"pack": "npm pack 2>&1",
"prebuild": "npm run clean",
"test:browser": "npm run build:test && npm run unit-test:browser && npm run integration-test:browser",
"test:node": "npm run build:test && npm run unit-test:node && npm run integration-test:node",
"test:node": "npm run integration-test:node",
"test": "npm run build:test && npm run unit-test && npm run integration-test",
"unit-test:browser": "echo skipped",
"unit-test:node": "echo skipped",
@ -62,6 +62,7 @@
"dependencies": {
"@azure/abort-controller": "^1.0.0",
"@azure/event-hubs": "^5.6.0",
"@azure/data-tables": "^12.1.1",
"@azure/logger": "^1.0.0",
"tslib": "^2.2.0"
},

Просмотреть файл

@ -7,16 +7,18 @@
import { Checkpoint } from '@azure/event-hubs';
import { CheckpointStore } from '@azure/event-hubs';
import { PartitionOwnership } from '@azure/event-hubs';
import { TableClient } from '@azure/data-tables';
// @public
export const logger: import("@azure/logger").AzureLogger;
// @public
export class TableCheckpointStore implements CheckpointStore {
claimOwnership(): Promise<PartitionOwnership[]>;
listCheckpoints(): Promise<Checkpoint[]>;
listOwnership(): Promise<PartitionOwnership[]>;
updateCheckpoint(): Promise<void>;
constructor(tableClient: TableClient);
claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]>;
listCheckpoints(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string): Promise<Checkpoint[]>;
listOwnership(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
}

Просмотреть файл

@ -0,0 +1,3 @@
# Basic info that will be required all the time.
STORAGE_ACCOUNT_NAME=<Name of the storage account for the checkpoint store>
STORAGE_ACCOUNT_KEY=<Key for the storage account for the checkpoint store>

Просмотреть файл

@ -2,12 +2,68 @@
// Licensed under the MIT license.
import { CheckpointStore, PartitionOwnership, Checkpoint } from "@azure/event-hubs";
import { odata, TableClient, TableInsertEntityHeaders } from "@azure/data-tables";
import { logger, logErrorStackTrace } from "./log";
/**
*
* Checks if the value contains a `Timestamp` field of type `string`.
*/
function _hasTimestamp<T extends TableInsertEntityHeaders>(
value: T
): value is T & { Timestamp: string } {
return typeof (value as any).Timestamp === "string";
}
/**
* A checkpoint entity of type CheckpointEntity to be stored in the table
* @internal
*
*/
export interface CheckpointEntity {
/**
* The partitionKey is a composite key assembled in the following format:
* `${fullyQualifiedNamespace} ${eventHubName} ${consumerGroup} Checkpoint`
*/
partitionKey: string;
/**
* The rowKey is the partitionId
*
*/
rowKey: string;
sequencenumber: string;
offset: string;
}
/**
* An ownership entity of type PartitionOwnership to be stored in the table
* @internal
* @hidden
*/
export interface PartitionOwnershipEntity {
/**
* The partitionKey is a composite key assembled in the following format:
* `${fullyQualifiedNamespace} ${eventHubName} ${consumerGroup} Ownership`
*/
partitionKey: string;
/**
* The rowKey is the partitionId
*
*/
rowKey: string;
ownerid: string;
}
/**
* An implementation of CheckpointStore that uses Azure Table Storage to persist checkpoint data.
*/
export class TableCheckpointStore implements CheckpointStore {
private _tableClient: TableClient;
constructor(tableClient: TableClient) {
this._tableClient = tableClient;
}
/**
* Get the list of all existing partition ownership from the underlying data store. May return empty
* results if there are is no existing ownership information.
@ -22,14 +78,43 @@ export class TableCheckpointStore implements CheckpointStore {
* - `tracingOptions`: Options for configuring tracing.
* @returns Partition ownership details of all the partitions that have had an owner.
*/
async listOwnership(): /*
fullyQualifiedNamespace: string,
eventHubName: string,
consumerGroup: string,
options: OperationOptions = {}
*/
Promise<PartitionOwnership[]> {
throw new Error("not implemented");
async listOwnership(
fullyQualifiedNamespace: string,
eventHubName: string,
consumerGroup: string
): Promise<PartitionOwnership[]> {
const partitionKey = `${fullyQualifiedNamespace} ${eventHubName} ${consumerGroup} Ownership`;
const partitionOwnershipArray: PartitionOwnership[] = [];
const entitiesIter = this._tableClient.listEntities<PartitionOwnershipEntity>({
queryOptions: { filter: odata`PartitionKey eq ${partitionKey}` }
});
try {
for await (const entity of entitiesIter) {
if (!entity.timestamp) {
throw new Error(
`Unable to retrieve timestamp from partitionKey "${partitionKey}", rowKey "${entity.rowKey}"`
);
}
const partitionOwnership: PartitionOwnership = {
fullyQualifiedNamespace,
eventHubName,
consumerGroup,
ownerId: entity.ownerid,
partitionId: entity.rowKey,
lastModifiedTimeInMs: new Date(entity.timestamp).getTime(),
etag: entity.etag
};
partitionOwnershipArray.push(partitionOwnership);
}
return partitionOwnershipArray;
} catch (err) {
logger.warning(`Error occurred while fetching the list of entities`, err.message);
logErrorStackTrace(err);
if (err?.name === "AbortError") throw err;
throw new Error(`Error occurred while fetching the list of entities. \n${err}`);
}
}
/**
@ -42,13 +127,80 @@ export class TableCheckpointStore implements CheckpointStore {
* - `tracingOptions`: Options for configuring tracing.
* @returns A list partitions this instance successfully claimed ownership.
*/
async claimOwnership(): /*
partitionOwnership: PartitionOwnership[],
options: OperationOptions = {}
*/
Promise<PartitionOwnership[]> {
console.log("nothing to claim");
throw new Error("not implemented");
async claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]> {
const partitionOwnershipArray: PartitionOwnership[] = [];
for (const ownership of partitionOwnership) {
const updatedOwnership = { ...ownership };
const partitionKey = `${ownership.fullyQualifiedNamespace} ${ownership.eventHubName} ${ownership.consumerGroup} Ownership`;
const ownershipEntity: PartitionOwnershipEntity = {
partitionKey: partitionKey,
rowKey: ownership.partitionId,
ownerid: ownership.ownerId
};
// When we have an etag, we know the entity existed.
// If we encounter an error we should fail.
try {
if (ownership.etag) {
const updatedMetadata = await this._tableClient.updateEntity(ownershipEntity, "Replace", {
etag: ownership.etag
});
const entityRetrieved = await this._tableClient.getEntity(
ownershipEntity.partitionKey,
ownershipEntity.rowKey
);
if (!entityRetrieved.timestamp) {
throw new Error(
`Unable to retrieve timestamp from partitionKey "${partitionKey}", rowKey "${entityRetrieved.rowKey}"`
);
}
updatedOwnership.lastModifiedTimeInMs = new Date(entityRetrieved.timestamp).getTime();
updatedOwnership.etag = updatedMetadata.etag;
partitionOwnershipArray.push(updatedOwnership);
logger.info(
`[${ownership.ownerId}] Claimed ownership successfully for partition: ${ownership.partitionId}`,
`LastModifiedTime: ${ownership.lastModifiedTimeInMs}, ETag: ${ownership.etag}`
);
} else {
const newOwnershipMetadata = await this._tableClient.createEntity(ownershipEntity, {
requestOptions: {
customHeaders: {
Prefer: "return-content"
}
}
});
if (!_hasTimestamp(newOwnershipMetadata)) {
throw new Error(
`Unable to retrieve timestamp from partitionKey "${partitionKey}", rowKey "${ownershipEntity.rowKey}"`
);
}
updatedOwnership.lastModifiedTimeInMs = new Date(
newOwnershipMetadata.Timestamp
).getTime();
updatedOwnership.etag = newOwnershipMetadata.etag;
partitionOwnershipArray.push(updatedOwnership);
}
} catch (err) {
if (err.statusCode === 412) {
// etag failures (precondition not met) aren't fatal errors. They happen
// as multiple consumers attempt to claim the same partition (first one wins)
// and losers get this error.
logger.verbose(
`[${ownership.ownerId}] Did not claim partition ${ownership.partitionId}. Another processor has already claimed it.`
);
continue;
}
logger.warning(
`Error occurred while claiming ownership for partition: ${ownership.partitionId}`,
err.message
);
logErrorStackTrace(err);
}
}
return partitionOwnershipArray;
}
/**
@ -62,15 +214,27 @@ export class TableCheckpointStore implements CheckpointStore {
* - `abortSignal`: A signal used to request operation cancellation.
* - `tracingOptions`: Options for configuring tracing.
*/
async listCheckpoints(): /*
async listCheckpoints(
fullyQualifiedNamespace: string,
eventHubName: string,
consumerGroup: string,
options: OperationOptions = {}
*/
Promise<Checkpoint[]> {
console.log("no checkpoints to list");
throw new Error("not implemented");
consumerGroup: string
): Promise<Checkpoint[]> {
const partitionKey = `${fullyQualifiedNamespace} ${eventHubName} ${consumerGroup} Checkpoint`;
const checkpoints: Checkpoint[] = [];
const entitiesIter = this._tableClient.listEntities<CheckpointEntity>({
queryOptions: { filter: odata`PartitionKey eq ${partitionKey}` }
});
for await (const entity of entitiesIter) {
checkpoints.push({
consumerGroup,
eventHubName,
fullyQualifiedNamespace,
partitionId: entity.rowKey,
offset: parseInt(entity.offset, 10),
sequenceNumber: parseInt(entity.sequencenumber, 10)
});
}
return checkpoints;
}
/**
@ -82,8 +246,24 @@ export class TableCheckpointStore implements CheckpointStore {
* - `tracingOptions`: Options for configuring tracing.
* @returns A promise that resolves when the checkpoint has been updated.
*/
async updateCheckpoint(/* checkpoint: Checkpoint */): Promise<void> {
console.log("no checkpoint to update");
throw new Error("not implemented");
async updateCheckpoint(checkpoint: Checkpoint): Promise<void> {
const partitionKey = `${checkpoint.fullyQualifiedNamespace} ${checkpoint.eventHubName} ${checkpoint.consumerGroup} Checkpoint`;
const checkpointEntity: CheckpointEntity = {
partitionKey: partitionKey,
rowKey: checkpoint.partitionId,
sequencenumber: checkpoint.sequenceNumber.toString(),
offset: checkpoint.offset.toString()
};
try {
await this._tableClient.upsertEntity(checkpointEntity);
logger.verbose(`Updated checkpoint successfully for partition: ${checkpoint.partitionId}`);
return;
} catch (err) {
logger.verbose(
`Error occurred while upating the checkpoint for partition: ${checkpoint.partitionId}.`,
err.message
);
throw err;
}
}
}

Просмотреть файл

@ -0,0 +1,514 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import chai from "chai";
const should = chai.should();
import { TableCheckpointStore } from "../src";
import debugModule from "debug";
const debug = debugModule("azure:event-hubs:tableCheckpointStore");
import { Checkpoint, PartitionOwnership } from "@azure/event-hubs";
import { EnvVarKeys, getEnvVars } from "./utils/testUtils";
import { TableServiceClient, AzureNamedKeyCredential, TableClient } from "@azure/data-tables";
import { CheckpointEntity, PartitionOwnershipEntity } from "../src/tableCheckpointStore";
const env = getEnvVars();
/* test to show that test framework is set up well */
describe("TableCheckpointStore", () => {
it("is exported from the package", () => {
should.exist(TableCheckpointStore, "Expected TableCheckpointStore to be exported.");
});
});
const service = {
storageAccountName: env[EnvVarKeys.STORAGE_ACCOUNT_NAME],
storageAccountKey: env[EnvVarKeys.STORAGE_ACCOUNT_KEY]
};
const credential = new AzureNamedKeyCredential(
service.storageAccountName,
service.storageAccountKey
);
const serviceClient = new TableServiceClient(
`https://${service.storageAccountName}.table.core.windows.net`,
credential
);
describe("TableCheckpointStore", function(): void {
let client: TableClient;
let tableName: string;
beforeEach("creating table", async () => {
tableName = `table${new Date().getTime()}${Math.floor(Math.random() * 10) + 1}`;
client = new TableClient(
`https://${service.storageAccountName}.table.core.windows.net`,
tableName,
credential
);
await serviceClient.createTable(tableName);
});
afterEach(async () => {
await serviceClient.deleteTable(tableName);
});
describe("Runs tests on table with no entities", function() {
describe("listOwnership", function() {
it("listOwnership should return an empty array", async function(): Promise<void> {
const checkpointStore = new TableCheckpointStore(client);
const listOwnership = await checkpointStore.listOwnership(
"test.servicebus.windows.net",
"testHub",
"testConsumerGroup"
);
should.equal(listOwnership.length, 0);
});
});
describe("listCheckpoints", function() {
it("listCheckpoint should return an empty array", async function(): Promise<void> {
const checkpointStore = new TableCheckpointStore(client);
const checkpoints = await checkpointStore.listCheckpoints(
"test.servicebus.windows.net",
"testHub",
"testConsumerGroup"
);
should.equal(checkpoints.length, 0);
});
});
describe("updateCheckpoint", function() {
it("creates a checkpoint where one doesn't already exist", async () => {
const checkpointStore = new TableCheckpointStore(client);
const eventHubProperties = {
fullyQualifiedNamespace: "pink.servicebus.windows.net",
eventHubName: "pinkHub",
consumerGroup: "testConsumerGroup"
};
// Ensure that there aren't any checkpoints.
let checkpoints = await checkpointStore.listCheckpoints(
eventHubProperties.fullyQualifiedNamespace,
eventHubProperties.eventHubName,
eventHubProperties.consumerGroup
);
checkpoints.length.should.equal(0);
// Create the checkpoint to add.
const checkpoint: Checkpoint = {
consumerGroup: eventHubProperties.consumerGroup,
eventHubName: eventHubProperties.eventHubName,
fullyQualifiedNamespace: eventHubProperties.fullyQualifiedNamespace,
offset: 0,
partitionId: "0",
sequenceNumber: 1
};
await checkpointStore.updateCheckpoint(checkpoint);
// Ensure that there is a checkpoint.
checkpoints = await checkpointStore.listCheckpoints(
eventHubProperties.fullyQualifiedNamespace,
eventHubProperties.eventHubName,
eventHubProperties.consumerGroup
);
checkpoints.length.should.equal(1);
});
it("forwards errors", async () => {
const checkpointStore = new TableCheckpointStore(client);
const eventHubProperties = {
fullyQualifiedNamespace: "testNamespace.servicebus.windows.net",
eventHubName: "testEventHub",
consumerGroup: "testConsumerGroup"
};
// now let's induce a bad failure (removing the table)
await client.deleteTable();
// Create the checkpoint to add.
const checkpoint: Checkpoint = {
consumerGroup: eventHubProperties.consumerGroup,
eventHubName: eventHubProperties.eventHubName,
fullyQualifiedNamespace: eventHubProperties.fullyQualifiedNamespace,
offset: 0,
partitionId: "0",
sequenceNumber: 1
};
try {
await checkpointStore.updateCheckpoint(checkpoint);
throw new Error("Test failure");
} catch (err) {
err.message.should.not.equal("Test failure");
}
});
});
describe("claimOwnership", function() {
it("claimOwnership call should succeed, if it has been called for the first time", async function(): Promise<
void
> {
const checkpointStore = new TableCheckpointStore(client);
const listOwnership = await checkpointStore.listOwnership(
"testNamespace.servicebus.windows.net",
"testEventHub",
"testConsumerGroup"
);
should.equal(listOwnership.length, 0);
const partitionOwnership: PartitionOwnership = {
ownerId: "Id1",
partitionId: "0",
fullyQualifiedNamespace: "testNamespace.servicebus.windows.net",
consumerGroup: "testConsumerGroup",
eventHubName: "testEventHub"
};
const partitionOwnershipArray = await checkpointStore.claimOwnership([partitionOwnership]);
should.equal(partitionOwnershipArray.length, 1);
const ownershipList = await checkpointStore.listOwnership(
"testNamespace.servicebus.windows.net",
"testEventHub",
"testConsumerGroup"
);
should.equal(ownershipList.length, 1, "Unexpected number of ownerships in list.");
should.equal(
ownershipList[0].ownerId,
"Id1",
"The 1st ownership item has the wrong ownerId."
);
should.equal(
ownershipList[0].consumerGroup,
"testConsumerGroup",
"The 1st ownership item has the wrong consumerGroup."
);
should.equal(
ownershipList[0].fullyQualifiedNamespace,
"testNamespace.servicebus.windows.net",
"The 1st fullyQualifiedNamespace item has the wrong fullyQualifiedNamespace."
);
should.equal(
ownershipList[0].eventHubName,
"testEventHub",
"The 1st ownership item has the wrong eventHubName."
);
should.equal(
ownershipList[0].partitionId,
"0",
"The 1st ownership item has the wrong partitionId."
);
should.exist(ownershipList[0].lastModifiedTimeInMs, "lastModifiedTimeInMs should exist.");
should.exist(ownershipList[0].etag, "etag should exist.");
debug(
`LastModifiedTime: ${ownershipList[0].lastModifiedTimeInMs!}, ETag: ${
ownershipList[0].etag
}`
);
});
});
});
describe("Runs tests on a populated table", function() {
const namespace = "blue.servicebus.windows.net";
const eventHubName = "blueHub";
const consumerGroup = "$default";
beforeEach("creating table", async () => {
/* Checkpoint */
const checkpoint_entity: CheckpointEntity = {
partitionKey: `${namespace} ${eventHubName} ${consumerGroup} Checkpoint`,
rowKey: "0",
sequencenumber: "100",
offset: "1023"
};
await client.createEntity(checkpoint_entity);
/* Ownership */
const ownership_entity: PartitionOwnershipEntity = {
partitionKey: `${namespace} ${eventHubName} ${consumerGroup} Ownership`,
rowKey: "0",
ownerid: "Id0"
};
await client.createEntity(ownership_entity);
});
describe("listOwnership", function() {
it("listOwnership should print an array of ownerships", async function() {
const checkpointStore = new TableCheckpointStore(client);
const listOwnership = await checkpointStore.listOwnership(
"blue.servicebus.windows.net",
"blueHub",
"$default"
);
should.equal(listOwnership.length, 1);
});
describe("listCheckpoints", function() {
it("listCheckpoints should print out an array of checkpoints", async function() {
const checkpointStore = new TableCheckpointStore(client);
const listCheckpoint = await checkpointStore.listCheckpoints(
"blue.servicebus.windows.net",
"blueHub",
"$default"
);
should.equal(listCheckpoint.length, 1);
});
});
describe("claimOwnership", function() {
// these errors happen when we have multiple consumers starting up
// at the same time and load balancing amongst themselves. This is a
// normal thing and shouldn't be reported to the user.
it("claimOwnership ignores errors about etags", async () => {
const checkpointStore = new TableCheckpointStore(client);
const listOwnership = await checkpointStore.listOwnership(
"blue.servicebus.windows.net",
"blueHub",
"$default"
);
const originalClaimedOwnerships = await checkpointStore.claimOwnership([
listOwnership[0]
]);
const originalETag = originalClaimedOwnerships[0].etag;
const newClaimedOwnerships = await checkpointStore.claimOwnership(
originalClaimedOwnerships
);
newClaimedOwnerships.length.should.equal(1);
newClaimedOwnerships.length.should.equal(1);
newClaimedOwnerships[0]!.etag!.should.not.equal(originalETag);
// we've now invalidated the previous ownership's etag so using the old etag will fail
const shouldNotThrowButNothingWillClaim = await checkpointStore.claimOwnership([
{
partitionId: "0",
consumerGroup: "$default",
fullyQualifiedNamespace: "blue.servicebus.windows.net",
eventHubName: "blueHub",
ownerId: "Id0",
etag: originalETag
}
]);
shouldNotThrowButNothingWillClaim.length.should.equal(0);
});
it("After multiple claimOwnership calls for a single partition, listOwnership should return an array with a single PartitionOwnership for that partition.", async function(): Promise<
void
> {
const checkpointStore = new TableCheckpointStore(client);
const listOwnership = await checkpointStore.listOwnership(
"testNamespace.servicebus.windows.net",
"testEventHub",
"testConsumerGroup"
);
should.equal(listOwnership.length, 0);
const partitionOwnershipArray: PartitionOwnership[] = [];
for (let index = 0; index < 3; index++) {
const partitionOwnership: PartitionOwnership = {
ownerId: "Id1",
partitionId: `${index}`,
fullyQualifiedNamespace: "testNamespace.servicebus.windows.net",
consumerGroup: "testConsumerGroup",
eventHubName: "testEventHub"
};
partitionOwnershipArray.push(partitionOwnership);
}
await checkpointStore.claimOwnership([partitionOwnershipArray[0]]);
await checkpointStore.claimOwnership([partitionOwnershipArray[1]]);
await checkpointStore.claimOwnership([partitionOwnershipArray[2]]);
const ownershipList = await checkpointStore.listOwnership(
"testNamespace.servicebus.windows.net",
"testEventHub",
"testConsumerGroup"
);
should.equal(ownershipList.length, 3, "Unexpected number of ownerships in list.");
should.equal(
ownershipList[0].ownerId,
"Id1",
"The 1st ownership item has the wrong ownerId."
);
should.equal(
ownershipList[0].consumerGroup,
"testConsumerGroup",
"The 1st ownership item has the wrong consumerGroup."
);
should.equal(
ownershipList[0].fullyQualifiedNamespace,
"testNamespace.servicebus.windows.net",
"The 1st fullyQualifiedNamespace item has the wrong fullyQualifiedNamespace."
);
should.equal(
ownershipList[0].eventHubName,
"testEventHub",
"The 1st ownership item has the wrong eventHubName."
);
should.equal(
ownershipList[0].partitionId,
"0",
"The 1st ownership item has the wrong partitionId."
);
should.exist(ownershipList[0].lastModifiedTimeInMs, "lastModifiedTimeInMs should exist.");
should.exist(ownershipList[0].etag, "etag should exist.");
debug(
`LastModifiedTime: ${ownershipList[0].lastModifiedTimeInMs!}, ETag: ${
ownershipList[0].etag
}`
);
});
it("After multiple claimOwnership calls for multiple partition, listOwnership should return an array with a single PartitionOwnership for each partition.", async function(): Promise<
void
> {
const checkpointStore = new TableCheckpointStore(client);
const listOwnership = await checkpointStore.listOwnership(
"testNamespace.servicebus.windows.net",
"testEventHub",
"testConsumerGroup"
);
should.equal(listOwnership.length, 0);
const partitionOwnershipArray: PartitionOwnership[] = [];
for (let index = 0; index < 3; index++) {
const partitionOwnership: PartitionOwnership = {
ownerId: "Id1",
partitionId: `${index}`,
fullyQualifiedNamespace: "testNamespace.servicebus.windows.net",
consumerGroup: "testConsumerGroup",
eventHubName: "testEventHub"
};
partitionOwnershipArray.push(partitionOwnership);
}
await checkpointStore.claimOwnership([partitionOwnershipArray[0]]);
await checkpointStore.claimOwnership([partitionOwnershipArray[1]]);
await checkpointStore.claimOwnership([partitionOwnershipArray[2]]);
const ownershipList = await checkpointStore.listOwnership(
"testNamespace.servicebus.windows.net",
"testEventHub",
"testConsumerGroup"
);
should.equal(ownershipList.length, 3, "Unexpected number of ownerships in list.");
should.equal(
ownershipList[0].ownerId,
"Id1",
"The 1st ownership item has the wrong ownerId."
);
should.equal(
ownershipList[0].consumerGroup,
"testConsumerGroup",
"The 1st ownership item has the wrong consumerGroup."
);
should.equal(
ownershipList[0].fullyQualifiedNamespace,
"testNamespace.servicebus.windows.net",
"The 1st fullyQualifiedNamespace item has the wrong fullyQualifiedNamespace."
);
should.equal(
ownershipList[0].eventHubName,
"testEventHub",
"The 1st ownership item has the wrong eventHubName."
);
should.equal(
ownershipList[0].eventHubName,
"testEventHub",
"The 1st ownership item has the wrong eventHubName."
);
should.equal(
ownershipList[0].partitionId,
"0",
"The 1st ownership item has the wrong partitionId."
);
should.exist(ownershipList[0].lastModifiedTimeInMs, "lastModifiedTimeInMs should exist.");
should.exist(ownershipList[0].etag, "etag should exist.");
should.equal(
ownershipList[1].partitionId,
"1",
"The 2nd ownership item has the wrong partitionId."
);
should.exist(ownershipList[1].lastModifiedTimeInMs, "lastModifiedTimeInMs should exist.");
should.exist(ownershipList[1].etag, "etag should exist.");
should.equal(
ownershipList[2].partitionId,
"2",
"The 3rd ownership item has the wrong partitionId."
);
should.exist(ownershipList[2].lastModifiedTimeInMs, "lastModifiedTimeInMs should exist.");
should.exist(ownershipList[2].etag, "etag should exist.");
});
});
});
describe("updateCheckpoint", function() {
it("updates checkpoints successfully", async () => {
const checkpointStore = new TableCheckpointStore(client);
const eventHubProperties = {
fullyQualifiedNamespace: "testNamespace.servicebus.windows.net",
eventHubName: "testEventHub",
consumerGroup: "testConsumerGroup"
};
let i = 0;
while (i < 3) {
const checkpoint: Checkpoint = {
...eventHubProperties,
partitionId: i.toString(),
sequenceNumber: 100 + i,
offset: 1023 + i
};
await checkpointStore.updateCheckpoint(checkpoint);
i++;
}
let checkpoints = await checkpointStore.listCheckpoints(
eventHubProperties.fullyQualifiedNamespace,
eventHubProperties.eventHubName,
eventHubProperties.consumerGroup
);
checkpoints.length.should.equal(3);
checkpoints.sort((a, b) => a.partitionId.localeCompare(b.partitionId));
for (i = 0; i < 3; ++i) {
const checkpoint = checkpoints[i];
checkpoint.partitionId.should.equal(i.toString());
checkpoint.fullyQualifiedNamespace.should.equal("testNamespace.servicebus.windows.net");
checkpoint.consumerGroup.should.equal("testConsumerGroup");
checkpoint.eventHubName.should.equal("testEventHub");
checkpoint.sequenceNumber!.should.equal(100 + i);
checkpoint.offset!.should.equal(1023 + i);
// now update it
checkpoint.offset++;
checkpoint.sequenceNumber++;
await checkpointStore.updateCheckpoint(checkpoint);
}
checkpoints = await checkpointStore.listCheckpoints(
eventHubProperties.fullyQualifiedNamespace,
eventHubProperties.eventHubName,
eventHubProperties.consumerGroup
);
checkpoints.length.should.equal(3);
checkpoints.sort((a, b) => a.partitionId.localeCompare(b.partitionId));
for (i = 0; i < 3; ++i) {
const checkpoint = checkpoints[i];
checkpoint.partitionId.should.equal(i.toString());
checkpoint.fullyQualifiedNamespace.should.equal("testNamespace.servicebus.windows.net");
checkpoint.consumerGroup.should.equal("testConsumerGroup");
checkpoint.eventHubName.should.equal("testEventHub");
checkpoint.sequenceNumber!.should.equal(100 + i + 1);
checkpoint.offset!.should.equal(1023 + i + 1);
}
});
});
});
});

Просмотреть файл

@ -1,12 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import chai from "chai";
const should = chai.should();
import { TableCheckpointStore } from "../src";
/* test to show that test framework is set up well */
describe("TableCheckpointStore", () => {
it("is exported from the package", () => {
should.exist(TableCheckpointStore, "Expected TableCheckpointStore to be exported.");
});
});

Просмотреть файл

@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import * as dotenv from "dotenv";
dotenv.config();
declare const self: any;
export const isNode =
!!process && !!process.version && !!process.versions && !!process.versions.node;
export enum EnvVarKeys {
STORAGE_ACCOUNT_NAME = "STORAGE_ACCOUNT_NAME",
STORAGE_ACCOUNT_KEY = "STORAGE_ACCOUNT_KEY"
}
function getEnvVarValue(name: string): string | undefined {
if (isNode) {
return process.env[name];
} else {
return self.__env__[name];
}
}
export function getEnvVars(): { [key in EnvVarKeys]: any } {
return {
[EnvVarKeys.STORAGE_ACCOUNT_KEY]: getEnvVarValue(EnvVarKeys.STORAGE_ACCOUNT_KEY),
[EnvVarKeys.STORAGE_ACCOUNT_NAME]: getEnvVarValue(EnvVarKeys.STORAGE_ACCOUNT_NAME)
};
}

Просмотреть файл

@ -197,6 +197,14 @@
}
],
"outputs": {
"STORAGE_ACCOUNT_NAME": {
"type": "string",
"value": "[variables('storageAccountName')]"
},
"STORAGE_ACCOUNT_KEY": {
"type": "string",
"value": "[listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('storageAccountName')), variables('storageApiVersion')).keys[0].value]"
},
"EVENTHUB_NAME": {
"type": "string",
"value": "[variables('eventHubName')]"