[eventhubs-checkpointstore-blob] Updates CheckpointStore methods to support cancellation (#13862)

* [event-hubs] update CheckpointStore interface methods to accept optional OperationOptions

* [eventhubs-checkpointstore-blob] update BlobCheckpointStore async methods to support cancellation and passthrough of tracing options

* [eventhubs-checkpointstore-blob] update API review file
This commit is contained in:
chradek 2021-02-19 11:43:03 -08:00 коммит произвёл GitHub
Родитель da8f04bbc7
Коммит 55150cc8f8
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 322 добавлений и 31 удалений

Просмотреть файл

@ -1,7 +1,10 @@
# Release History
## 5.4.1 (Unreleased)
## 5.5.0 (Unreleased)
- Updates the methods on the `CheckpointStore` interface to accept
an optional `options` parameter that can be used to pass in an
`abortSignal` and `tracingOptions`.
## 5.4.0 (2021-02-09)

Просмотреть файл

@ -1,7 +1,7 @@
{
"name": "@azure/event-hubs",
"sdk-type": "client",
"version": "5.4.1",
"version": "5.5.0",
"description": "Azure Event Hubs SDK for JS.",
"author": "Microsoft Corporation",
"license": "MIT",

Просмотреть файл

@ -27,10 +27,10 @@ export interface Checkpoint {
// @public
export interface CheckpointStore {
claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]>;
listCheckpoints(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string): Promise<Checkpoint[]>;
listOwnership(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
claimOwnership(partitionOwnership: PartitionOwnership[], options?: OperationOptions): Promise<PartitionOwnership[]>;
listCheckpoints(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string, options?: OperationOptions): Promise<Checkpoint[]>;
listOwnership(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string, options?: OperationOptions): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint, options?: OperationOptions): Promise<void>;
}
// @public

Просмотреть файл

@ -13,6 +13,7 @@ import { CommonEventProcessorOptions } from "./models/private";
import { CloseReason } from "./models/public";
import { ConnectionContext } from "./connectionContext";
import { LoadBalancingStrategy } from "./loadBalancerStrategies/loadBalancingStrategy";
import { OperationOptions } from "./util/operationOptions";
/**
* An interface representing the details on which instance of a `EventProcessor` owns processing
@ -73,28 +74,41 @@ export interface CheckpointStore {
* <yournamespace>.servicebus.windows.net.
* @param eventHubName - The event hub name.
* @param consumerGroup - The consumer group name.
* @param options - A set of options that can be specified to influence the behavior of this method.
* - `abortSignal`: A signal used to request operation cancellation.
* - `tracingOptions`: Options for configuring tracing.
* @returns A list of partition ownership details of all the partitions that have/had an owner.
*/
listOwnership(
fullyQualifiedNamespace: string,
eventHubName: string,
consumerGroup: string
consumerGroup: string,
options?: OperationOptions
): Promise<PartitionOwnership[]>;
/**
* Called to claim ownership of a list of partitions. This will return the list of partitions that were owned
* successfully.
*
* @param partitionOwnership - The list of partition ownership this instance is claiming to own.
* @param options - A set of options that can be specified to influence the behavior of this method.
* - `abortSignal`: A signal used to request operation cancellation.
* - `tracingOptions`: Options for configuring tracing.
* @returns A list of partitions this instance successfully claimed ownership.
*/
claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]>;
claimOwnership(
partitionOwnership: PartitionOwnership[],
options?: OperationOptions
): Promise<PartitionOwnership[]>;
/**
* Updates the checkpoint in the data store for a partition.
*
* @param checkpoint - The checkpoint.
* @param options - A set of options that can be specified to influence the behavior of this method.
* - `abortSignal`: A signal used to request operation cancellation.
* - `tracingOptions`: Options for configuring tracing.
*/
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
updateCheckpoint(checkpoint: Checkpoint, options?: OperationOptions): Promise<void>;
/**
* Lists all the checkpoints in a data store for a given namespace, eventhub and consumer group.
@ -103,11 +117,16 @@ export interface CheckpointStore {
* <yournamespace>.servicebus.windows.net.
* @param eventHubName - The event hub name.
* @param consumerGroup - The consumer group name.
* @param options - A set of options that can be specified to influence the behavior of this method.
* - `abortSignal`: A signal used to request operation cancellation.
* - `tracingOptions`: Options for configuring tracing.
* @returns A list of checkpoints for a given namespace, eventhub, and consumer group.
*/
listCheckpoints(
fullyQualifiedNamespace: string,
eventHubName: string,
consumerGroup: string
consumerGroup: string,
options?: OperationOptions
): Promise<Checkpoint[]>;
}

Просмотреть файл

@ -6,5 +6,5 @@
*/
export const packageJsonInfo = {
name: "@azure/event-hubs",
version: "5.4.1"
version: "5.5.0"
};

Просмотреть файл

@ -1,7 +1,11 @@
# Release History
## 1.0.2 (Unreleased)
## 1.1.0 (Unreleased)
- Updates all async methods on `BlobCheckpointStore` to accept
an optional `options` parameter that can be used to pass in an
`abortSignal` and `tracingOptions`.
Resolves issue [#9492](https://github.com/Azure/azure-sdk-for-js/issues/9492).
## 1.0.1 (2020-08-03)

Просмотреть файл

@ -1,7 +1,7 @@
{
"name": "@azure/eventhubs-checkpointstore-blob",
"sdk-type": "client",
"version": "1.0.2",
"version": "1.1.0",
"description": "An Azure Storage Blob solution to store checkpoints when using Event Hubs.",
"author": "Microsoft Corporation",
"license": "MIT",
@ -60,7 +60,8 @@
"docs": "typedoc --excludePrivate --excludeNotExported --excludeExternals --stripInternal --mode file --out ./dist/docs ./src"
},
"dependencies": {
"@azure/event-hubs": "^5.0.0",
"@azure/abort-controller": "^1.0.0",
"@azure/event-hubs": "^5.5.0",
"@azure/logger": "^1.0.0",
"@azure/storage-blob": "^12.4.1",
"events": "^3.0.0",

Просмотреть файл

@ -7,15 +7,16 @@
import { Checkpoint } from '@azure/event-hubs';
import { CheckpointStore } from '@azure/event-hubs';
import { ContainerClient } from '@azure/storage-blob';
import { OperationOptions } from '@azure/event-hubs';
import { PartitionOwnership } from '@azure/event-hubs';
// @public
export class BlobCheckpointStore implements CheckpointStore {
constructor(containerClient: ContainerClient);
claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]>;
listCheckpoints(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string): Promise<Checkpoint[]>;
listOwnership(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
claimOwnership(partitionOwnership: PartitionOwnership[], options?: OperationOptions): Promise<PartitionOwnership[]>;
listCheckpoints(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string, options?: OperationOptions): Promise<Checkpoint[]>;
listOwnership(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string, options?: OperationOptions): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint, options?: OperationOptions): Promise<void>;
}
// @public

Просмотреть файл

@ -1,7 +1,12 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { CheckpointStore, PartitionOwnership, Checkpoint } from "@azure/event-hubs";
import {
CheckpointStore,
PartitionOwnership,
Checkpoint,
OperationOptions
} from "@azure/event-hubs";
import { ContainerClient, Metadata, RestError, BlobSetMetadataResponse } from "@azure/storage-blob";
import { logger, logErrorStackTrace } from "./log";
import { throwTypeErrorIfParameterMissing } from "./util/error";
@ -24,14 +29,19 @@ export class BlobCheckpointStore implements CheckpointStore {
* <yournamespace>.servicebus.windows.net.
* @param eventHubName - The event hub name.
* @param consumerGroup - The consumer group name.
* @param options - A set of options that can be specified to influence the behavior of this method.
* - `abortSignal`: A signal used to request operation cancellation.
* - `tracingOptions`: Options for configuring tracing.
* @returns Partition ownership details of all the partitions that have had an owner.
*/
async listOwnership(
fullyQualifiedNamespace: string,
eventHubName: string,
consumerGroup: string
consumerGroup: string,
options: OperationOptions = {}
): Promise<PartitionOwnership[]> {
const partitionOwnershipArray: PartitionOwnership[] = [];
const { abortSignal, tracingOptions } = options;
const blobPrefix = BlobCheckpointStore.getBlobPrefix({
type: "ownership",
@ -42,8 +52,10 @@ export class BlobCheckpointStore implements CheckpointStore {
try {
const blobs = this._containerClient.listBlobsFlat({
abortSignal,
includeMetadata: true,
prefix: blobPrefix
prefix: blobPrefix,
tracingOptions
});
for await (const blob of blobs) {
@ -72,6 +84,9 @@ export class BlobCheckpointStore implements CheckpointStore {
} catch (err) {
logger.warning(`Error occurred while fetching the list of blobs`, err.message);
logErrorStackTrace(err);
if (err?.name === "AbortError") throw err;
throw new Error(`Error occurred while fetching the list of blobs. \n${err}`);
}
}
@ -81,9 +96,15 @@ export class BlobCheckpointStore implements CheckpointStore {
* successfully claimed.
*
* @param partitionOwnership - The list of partition ownership this instance is claiming to own.
* @param options - A set of options that can be specified to influence the behavior of this method.
* - `abortSignal`: A signal used to request operation cancellation.
* - `tracingOptions`: Options for configuring tracing.
* @returns A list partitions this instance successfully claimed ownership.
*/
async claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]> {
async claimOwnership(
partitionOwnership: PartitionOwnership[],
options: OperationOptions = {}
): Promise<PartitionOwnership[]> {
const partitionOwnershipArray: PartitionOwnership[] = [];
for (const ownership of partitionOwnership) {
const blobName = BlobCheckpointStore.getBlobPrefix({ type: "ownership", ...ownership });
@ -93,7 +114,8 @@ export class BlobCheckpointStore implements CheckpointStore {
{
ownerid: ownership.ownerId
},
ownership.etag
ownership.etag,
options
);
if (updatedBlobResponse.lastModified) {
@ -138,12 +160,17 @@ export class BlobCheckpointStore implements CheckpointStore {
* <yournamespace>.servicebus.windows.net.
* @param eventHubName - The event hub name.
* @param consumerGroup - The consumer group name.
* @param options - A set of options that can be specified to influence the behavior of this method.
* - `abortSignal`: A signal used to request operation cancellation.
* - `tracingOptions`: Options for configuring tracing.
*/
async listCheckpoints(
fullyQualifiedNamespace: string,
eventHubName: string,
consumerGroup: string
consumerGroup: string,
options: OperationOptions = {}
): Promise<Checkpoint[]> {
const { abortSignal, tracingOptions } = options;
const blobPrefix = BlobCheckpointStore.getBlobPrefix({
type: "checkpoint",
fullyQualifiedNamespace,
@ -152,8 +179,10 @@ export class BlobCheckpointStore implements CheckpointStore {
});
const blobs = this._containerClient.listBlobsFlat({
abortSignal,
includeMetadata: true,
prefix: blobPrefix
prefix: blobPrefix,
tracingOptions
});
const checkpoints: Checkpoint[] = [];
@ -188,9 +217,12 @@ export class BlobCheckpointStore implements CheckpointStore {
* Updates the checkpoint in the data store for a partition.
*
* @param checkpoint - The checkpoint.
* @param options - A set of options that can be specified to influence the behavior of this method.
* - `abortSignal`: A signal used to request operation cancellation.
* - `tracingOptions`: Options for configuring tracing.
* @returns The new etag on successful update.
*/
async updateCheckpoint(checkpoint: Checkpoint): Promise<void> {
async updateCheckpoint(checkpoint: Checkpoint, options: OperationOptions = {}): Promise<void> {
throwTypeErrorIfParameterMissing(
"updateCheckpoint",
"sequenceNumber",
@ -206,7 +238,8 @@ export class BlobCheckpointStore implements CheckpointStore {
sequencenumber: checkpoint.sequenceNumber.toString(),
offset: checkpoint.offset.toString()
},
undefined
undefined,
options
);
logger.verbose(
@ -222,6 +255,9 @@ export class BlobCheckpointStore implements CheckpointStore {
err.message
);
logErrorStackTrace(err);
if (err?.name === "AbortError") throw err;
throw new Error(
`Error occurred while upating the checkpoint for partition: ${checkpoint.partitionId}, ${err}`
);
@ -251,24 +287,31 @@ export class BlobCheckpointStore implements CheckpointStore {
private async _setBlobMetadata(
blobName: string,
metadata: OwnershipMetadata | CheckpointMetadata,
etag: string | undefined
etag: string | undefined,
options: OperationOptions = {}
): Promise<BlobSetMetadataResponse> {
const { abortSignal, tracingOptions } = options;
const blockBlobClient = this._containerClient.getBlobClient(blobName).getBlockBlobClient();
// When we have an etag, we know the blob existed.
// If we encounter an error we should fail.
if (etag) {
return blockBlobClient.setMetadata(metadata as Metadata, {
abortSignal,
conditions: {
ifMatch: etag
}
},
tracingOptions
});
} else {
try {
// Attempt to set metadata, and fallback to upload if the blob doesn't already exist.
// This avoids poor performance in storage accounts with soft-delete or blob versioning enabled.
// https://github.com/Azure/azure-sdk-for-js/issues/10132
return await blockBlobClient.setMetadata(metadata as Metadata);
return await blockBlobClient.setMetadata(metadata as Metadata, {
abortSignal,
tracingOptions
});
} catch (err) {
// Check if the error is `BlobNotFound` and fallback to `upload` if it is.
if (err?.name !== "RestError") {
@ -281,7 +324,9 @@ export class BlobCheckpointStore implements CheckpointStore {
}
return blockBlobClient.upload("", 0, {
metadata: metadata as Metadata
abortSignal,
metadata: metadata as Metadata,
tracingOptions
});
}
}

Просмотреть файл

@ -16,9 +16,11 @@ import { PartitionOwnership, Checkpoint, EventHubConsumerClient } from "@azure/e
import { Guid } from "guid-typescript";
import { parseIntOrThrow } from "../src/blobCheckpointStore";
import { fail } from "assert";
import { AbortController } from "@azure/abort-controller";
const env = getEnvVars();
describe("Blob Checkpoint Store", function(): void {
const TEST_FAILURE = "Test failure";
const service = {
storageConnectionString: env[EnvVarKeys.STORAGE_CONNECTION_STRING]
};
@ -42,6 +44,56 @@ describe("Blob Checkpoint Store", function(): void {
await containerClient.delete();
});
describe("listOwnership", function() {
it("supports cancellation via abortSignal", async function() {
const checkpointStore = new BlobCheckpointStore(containerClient);
// Create an abort controller and abort it after blocking code is ran.
const abortController = new AbortController();
setTimeout(() => abortController.abort(), 0);
const signal = abortController.signal;
try {
await checkpointStore.listOwnership(
"testNamespace.servicebus.windows.net",
"testEventHub",
"testConsumerGroup",
{
abortSignal: signal
}
);
throw new Error(TEST_FAILURE);
} catch (err) {
should.equal(err.name, "AbortError");
should.not.equal(err.message, TEST_FAILURE);
}
});
it("supports cancellation via abortSignal (pre-cancelled)", async function() {
const checkpointStore = new BlobCheckpointStore(containerClient);
// Create an abort controller and immediately abort it.
const abortController = new AbortController();
abortController.abort();
const signal = abortController.signal;
try {
await checkpointStore.listOwnership(
"testNamespace.servicebus.windows.net",
"testEventHub",
"testConsumerGroup",
{
abortSignal: signal
}
);
throw new Error(TEST_FAILURE);
} catch (err) {
should.equal(err.name, "AbortError");
should.not.equal(err.message, TEST_FAILURE);
}
});
});
it("listOwnership should return an empty array", async function(): Promise<void> {
const checkpointStore = new BlobCheckpointStore(containerClient);
const listOwnership = await checkpointStore.listOwnership(
@ -52,6 +104,68 @@ describe("Blob Checkpoint Store", function(): void {
should.equal(listOwnership.length, 0);
});
describe("claimOwnership", function() {
it("supports cancellation via abortSignal", async function() {
const checkpointStore = new BlobCheckpointStore(containerClient);
// Create an abort controller and abort it after blocking code is ran.
const abortController = new AbortController();
setTimeout(() => abortController.abort(), 0);
const signal = abortController.signal;
try {
await checkpointStore.claimOwnership(
[
{
partitionId: "0",
consumerGroup: EventHubConsumerClient.defaultConsumerGroupName,
fullyQualifiedNamespace: "fqdn",
eventHubName: "ehname",
ownerId: "me"
}
],
{
abortSignal: signal
}
);
throw new Error(TEST_FAILURE);
} catch (err) {
should.equal(err.name, "AbortError");
should.not.equal(err.message, TEST_FAILURE);
}
});
it("supports cancellation via abortSignal (pre-cancelled)", async function() {
const checkpointStore = new BlobCheckpointStore(containerClient);
// Create an abort controller and immediately abort it.
const abortController = new AbortController();
abortController.abort();
const signal = abortController.signal;
try {
await checkpointStore.claimOwnership(
[
{
partitionId: "0",
consumerGroup: EventHubConsumerClient.defaultConsumerGroupName,
fullyQualifiedNamespace: "fqdn",
eventHubName: "ehname",
ownerId: "me"
}
],
{
abortSignal: signal
}
);
throw new Error(TEST_FAILURE);
} catch (err) {
should.equal(err.name, "AbortError");
should.not.equal(err.message, TEST_FAILURE);
}
});
});
// these errors happen when we have multiple consumers starting up
// at the same time and load balancing amongst themselves. This is a
// normal thing and shouldn't be reported to the user.
@ -250,6 +364,56 @@ describe("Blob Checkpoint Store", function(): void {
ownershipList[2].etag!.should.not.undefined;
});
describe("listCheckpoints", function() {
it("supports cancellation via abortSignal", async function() {
const checkpointStore = new BlobCheckpointStore(containerClient);
// Create an abort controller and abort it after blocking code is ran.
const abortController = new AbortController();
setTimeout(() => abortController.abort(), 0);
const signal = abortController.signal;
try {
await checkpointStore.listCheckpoints(
"testNamespace.servicebus.windows.net",
"testEventHub",
"testConsumerGroup",
{
abortSignal: signal
}
);
throw new Error(TEST_FAILURE);
} catch (err) {
should.equal(err.name, "AbortError");
should.not.equal(err.message, TEST_FAILURE);
}
});
it("supports cancellation via abortSignal (pre-cancelled)", async function() {
const checkpointStore = new BlobCheckpointStore(containerClient);
// Create an abort controller and immediately abort it.
const abortController = new AbortController();
abortController.abort();
const signal = abortController.signal;
try {
await checkpointStore.listCheckpoints(
"testNamespace.servicebus.windows.net",
"testEventHub",
"testConsumerGroup",
{
abortSignal: signal
}
);
throw new Error(TEST_FAILURE);
} catch (err) {
should.equal(err.name, "AbortError");
should.not.equal(err.message, TEST_FAILURE);
}
});
});
describe("updateCheckpoint()", () => {
it("updates checkpoints successfully", async () => {
const checkpointStore = new BlobCheckpointStore(containerClient);
@ -385,6 +549,60 @@ describe("Blob Checkpoint Store", function(): void {
err.message.should.not.equal("Test failure");
}
});
it("supports cancellation via abortSignal", async function() {
const checkpointStore = new BlobCheckpointStore(containerClient);
// Create an abort controller and abort it after blocking code is ran.
const abortController = new AbortController();
setTimeout(() => abortController.abort(), 0);
const signal = abortController.signal;
// Create the checkpoint to add.
const checkpoint: Checkpoint = {
consumerGroup: "testNamespace.servicebus.windows.net",
eventHubName: "testEventHub",
fullyQualifiedNamespace: "testConsumerGroup",
offset: 0,
partitionId: "0",
sequenceNumber: 1
};
try {
await checkpointStore.updateCheckpoint(checkpoint, { abortSignal: signal });
throw new Error(TEST_FAILURE);
} catch (err) {
should.equal(err.name, "AbortError");
should.not.equal(err.message, TEST_FAILURE);
}
});
it("supports cancellation via abortSignal (pre-cancelled)", async function() {
const checkpointStore = new BlobCheckpointStore(containerClient);
// Create an abort controller and immediately abort it.
const abortController = new AbortController();
abortController.abort();
const signal = abortController.signal;
// Create the checkpoint to add.
const checkpoint: Checkpoint = {
consumerGroup: "testNamespace.servicebus.windows.net",
eventHubName: "testEventHub",
fullyQualifiedNamespace: "testConsumerGroup",
offset: 0,
partitionId: "0",
sequenceNumber: 1
};
try {
await checkpointStore.updateCheckpoint(checkpoint, { abortSignal: signal });
throw new Error(TEST_FAILURE);
} catch (err) {
should.equal(err.name, "AbortError");
should.not.equal(err.message, TEST_FAILURE);
}
});
});
it("Claiming ownership with an empty owner id should be fine (ie, unclaiming)", async function(): Promise<