[core-amqp][event-hubs] Fix "too much pending tasks" by making acquire lock calls cancellable (#14844)

* [core-amqp] adds defaultCancellableLock

* [core-amqp] make cbs acquireLock call cancellable

* [core-amqp] update CancellableAsyncLock to throw OperationTimeoutError if acquireTimeoutInMs is reached

* [core-amqp] fix eslint errors

* [event-hubs] add timeouts to acquire calls

* pass abortSignal to init/negotiateClaim methods

* update pnpm-lock.yaml

* [core-amqp] make flaky tests not flaky

* [core-amqp] make fields required

* [core-amqp] AcquireOptions -> AcquireLockProperties, add defaultCancellableLock back, remove unneeded code from allSettled helper util

* [core-amqp] parameter rename cleanup

* [event-hubs] add timeout to link initialization calls

* update pnpm-lock.yaml

* [event-hubs] improve timeout to cbsSession.negotiateClaimLock

* [core-amqp] add isOpen() to CbsClient

* [event-hubs] remove unneeded AbortError branch from event hub receiver

* [core-amqp] fix flaky test in node 15

* [event-hubs] use cbs isOpen()

* [core-amqp] add timeout to CbsClient init and negotiateClaim methods

* [event-hubs] pass timeout through to CbsClient init and negotiateClaim methods
This commit is contained in:
chradek 2021-04-19 16:57:29 -07:00 коммит произвёл GitHub
Родитель 319c8905e0
Коммит 4027106490
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
17 изменённых файлов: 869 добавлений и 141 удалений

Просмотреть файл

@ -157,7 +157,7 @@ packages:
jssha: 3.2.0
process: 0.11.10
rhea: 1.0.24
rhea-promise: 1.2.0
rhea-promise: 1.2.1
tslib: 2.2.0
url: 0.11.0
util: 0.12.3
@ -494,7 +494,7 @@ packages:
jssha: 3.2.0
long: 4.0.0
process: 0.11.10
rhea-promise: 1.2.0
rhea-promise: 1.2.1
tslib: 2.2.0
dev: false
engines:
@ -6245,14 +6245,14 @@ packages:
dev: false
resolution:
integrity: sha512-+6uilZXSJGyiqVeHQI3Krv6NTAd8cWRCY2uyCxmzR4/5IFtBqqFem1HV2OiwSj0Gu7OFChIJDfH2JyjN7J0vRA==
/rhea-promise/1.2.0:
/rhea-promise/1.2.1:
dependencies:
debug: 3.2.7
rhea: 1.0.24
tslib: 1.14.1
dev: false
resolution:
integrity: sha512-92Bi1bQ5RfbYZW3880F+RkUvu5RTYCtM2tvMaAAj1fyAqi9FTPifmg5gvn69CaOPV63Kn9MOvUyXXDAtiIWILw==
integrity: sha512-m0aa+/TM6Cl5qu+mHNPn7aadNf1525WxpKwQKINP/knvoi4otB74G16iPDoTDbnGcJo1lc0AQEbVku8Gdoqmuw==
/rhea/1.0.24:
dependencies:
debug: 3.2.7
@ -8446,7 +8446,7 @@ packages:
process: 0.11.10
puppeteer: 3.3.0
rhea: 1.0.24
rhea-promise: 1.2.0
rhea-promise: 1.2.1
rimraf: 3.0.2
rollup: 1.32.1
rollup-plugin-shim: 1.0.0
@ -8463,7 +8463,7 @@ packages:
dev: false
name: '@rush-temp/core-amqp'
resolution:
integrity: sha512-sJYJQ0Y+WhmvT42SCKcWh9klwMSZcxUiIkZdsDqH4oYULnqCRkx643r36KDPPQHtXUTMCNYi6t9J+F1dqZIjkg==
integrity: sha512-CW9xpW4t7TdHmCyP+ZPd/mb0oUjG2dVh6C/RGhzPS8ImdZvcY13QO84eWugbS0PajYfHbIgxbMOiPUaxRTpxeQ==
tarball: file:projects/core-amqp.tgz
version: 0.0.0
file:projects/core-asynciterator-polyfill.tgz:
@ -9265,7 +9265,7 @@ packages:
prettier: 1.19.1
process: 0.11.10
puppeteer: 3.3.0
rhea-promise: 1.2.0
rhea-promise: 1.2.1
rimraf: 3.0.2
rollup: 1.32.1
rollup-plugin-shim: 1.0.0
@ -9281,7 +9281,7 @@ packages:
dev: false
name: '@rush-temp/event-hubs'
resolution:
integrity: sha512-6umPhd37FwVbr5aPJFphiT8yNGBerqKy2SMM5dUZIQPYk1qpEDzg8vpx0jfoqZdvMHXJd6BjzNN3YZbV8Q+GWg==
integrity: sha512-8kvTC/cK5o9v8o0ZupbFtRjjOGERHFFr3srAOwWo+27nBCMo8F5hw9o9oHBMghESkh0uZFaiAzZG79/g7pFNxg==
tarball: file:projects/event-hubs.tgz
version: 0.0.0
file:projects/event-processor-host.tgz:
@ -10395,7 +10395,7 @@ packages:
process: 0.11.10
promise: 8.1.0
puppeteer: 3.3.0
rhea-promise: 1.2.0
rhea-promise: 1.2.1
rimraf: 3.0.2
rollup: 1.32.1
rollup-plugin-shim: 1.0.0
@ -10410,7 +10410,7 @@ packages:
dev: false
name: '@rush-temp/service-bus'
resolution:
integrity: sha512-5sxo4QifhdJl5OzqCWEj7aM9ZTtoS5FOBG6GOLgzvZ9YBYKRZQu8klmx9pmT3bgjMJxF00/GaE0KhBOMC6sjFA==
integrity: sha512-wYyGEJH+Xk4XpboHfLFEgWMQdfUDhpdU3xcykhlEN4knMb9GIyaKbAttspaHPQ8+g4xB7Mjh5BneIhyZKkiFCA==
tarball: file:projects/service-bus.tgz
version: 0.0.0
file:projects/storage-blob-changefeed.tgz:

Просмотреть файл

@ -79,7 +79,7 @@
"jssha": "^3.1.0",
"process": "^0.11.10",
"rhea": "^1.0.24",
"rhea-promise": "^1.2.0",
"rhea-promise": "^1.2.1",
"tslib": "^2.0.0",
"url": "^0.11.0",
"util": "^0.12.1"

Просмотреть файл

@ -22,6 +22,12 @@ import { SenderOptions } from 'rhea-promise';
import { Session } from 'rhea-promise';
import { WebSocketImpl } from 'rhea-promise';
// @public
export interface AcquireLockProperties {
abortSignal: AbortSignalLike | undefined;
timeoutInMs: number | undefined;
}
// @public
export interface AmqpAnnotatedMessage {
applicationProperties?: {
@ -87,6 +93,11 @@ export const AmqpMessageProperties: {
export { AsyncLock }
// @public
export interface CancellableAsyncLock {
acquire<T = void>(key: string, task: (...args: any[]) => Promise<T>, properties: AcquireLockProperties): Promise<T>;
}
// @public
export class CbsClient {
constructor(connection: Connection, connectionLock: string);
@ -97,9 +108,12 @@ export class CbsClient {
readonly endpoint: string;
init(options?: {
abortSignal?: AbortSignalLike;
timeoutInMs?: number;
}): Promise<void>;
isOpen(): boolean;
negotiateClaim(audience: string, token: string, tokenType: TokenType, options?: {
abortSignal?: AbortSignalLike;
timeoutInMs?: number;
}): Promise<CbsResponse>;
remove(): void;
readonly replyTo: string;
@ -351,6 +365,9 @@ export function createSasTokenProvider(data: {
sharedAccessSignature: string;
} | NamedKeyCredential | SASCredential): SasTokenProvider;
// @public
export const defaultCancellableLock: CancellableAsyncLock;
// @public
export const defaultLock: AsyncLock;

Просмотреть файл

@ -16,7 +16,7 @@ import { AbortError, AbortSignalLike } from "@azure/abort-controller";
import { Constants } from "./util/constants";
import { logErrorStackTrace, logger } from "./log";
import { StandardAbortMessage, translate } from "./errors";
import { defaultLock } from "./util/utils";
import { defaultCancellableLock } from "./util/utils";
import { RequestResponseLink } from "./requestResponseLink";
/**
@ -76,8 +76,8 @@ export class CbsClient {
* For example, `abortSignal` can be passed to allow cancelling an in-progress `init` invocation.
* @returns Promise<void>.
*/
async init(options: { abortSignal?: AbortSignalLike } = {}): Promise<void> {
const { abortSignal } = options;
async init(options: { abortSignal?: AbortSignalLike; timeoutInMs?: number } = {}): Promise<void> {
const { abortSignal, timeoutInMs } = options;
try {
if (abortSignal?.aborted) {
@ -87,12 +87,16 @@ export class CbsClient {
// Acquire the lock and establish an amqp connection if it does not exist.
if (!this.connection.isOpen()) {
logger.verbose("The CBS client is trying to establish an AMQP connection.");
await defaultLock.acquire(this.connectionLock, () => {
return this.connection.open({ abortSignal });
});
await defaultCancellableLock.acquire(
this.connectionLock,
() => {
return this.connection.open({ abortSignal });
},
{ abortSignal: abortSignal, timeoutInMs: timeoutInMs }
);
}
if (!this._isCbsSenderReceiverLinkOpen()) {
if (!this.isOpen()) {
const rxOpt: ReceiverOptions = {
source: {
address: this.endpoint
@ -199,9 +203,9 @@ export class CbsClient {
audience: string,
token: string,
tokenType: TokenType,
options: { abortSignal?: AbortSignalLike } = {}
options: { abortSignal?: AbortSignalLike; timeoutInMs?: number } = {}
): Promise<CbsResponse> {
const { abortSignal } = options;
const { abortSignal, timeoutInMs } = options;
try {
if (abortSignal?.aborted) {
throw new AbortError(StandardAbortMessage);
@ -224,6 +228,7 @@ export class CbsClient {
};
const responseMessage = await this._cbsSenderReceiverLink.sendRequest(request, {
abortSignal,
timeoutInMs,
requestName: "negotiateClaim"
});
logger.verbose("[%s] The CBS response is: %O", this.connection.id, responseMessage);
@ -246,7 +251,7 @@ export class CbsClient {
*/
async close(): Promise<void> {
try {
if (this._isCbsSenderReceiverLinkOpen()) {
if (this.isOpen()) {
const cbsLink = this._cbsSenderReceiverLink;
this._cbsSenderReceiverLink = undefined;
await cbsLink!.close();
@ -284,8 +289,8 @@ export class CbsClient {
* Indicates whether the cbs sender receiver link is open or closed.
* @returns `true` open, `false` closed.
*/
private _isCbsSenderReceiverLinkOpen(): boolean {
return this._cbsSenderReceiverLink! && this._cbsSenderReceiverLink!.isOpen();
public isOpen(): boolean {
return Boolean(this._cbsSenderReceiverLink?.isOpen());
}
private _fromRheaMessageResponse(msg: RheaMessage): CbsResponse {

Просмотреть файл

@ -33,6 +33,7 @@ export {
export {
delay,
parseConnectionString,
defaultCancellableLock,
defaultLock,
ParsedOutput,
AsyncLock,
@ -41,3 +42,4 @@ export {
export { AmqpAnnotatedMessage } from "./amqpAnnotatedMessage";
export { logger } from "./log";
export * from "./internals";
export { AcquireLockProperties, CancellableAsyncLock } from "./util/lock";

Просмотреть файл

@ -0,0 +1,253 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
import { OperationTimeoutError } from "rhea-promise";
import { StandardAbortMessage } from "../errors";
import { logger } from "../log";
/**
* Describes the properties that must be provided while acquiring a lock.
*/
export interface AcquireLockProperties {
/**
* An implementation of the `AbortSignalLike` interface to signal the request to cancel lock acquisition.
* This does not cancel running the task passed to `acquire()` if the lock has been acquired,
* but will prevent it from running if cancelled before the task is invoked.
*/
abortSignal: AbortSignalLike | undefined;
/**
* The allowed amount of time in milliseconds to acquire a lock.
* If a lock isn't acquired within this time, the promise returned
* by `acquire()` will be rejected with an Error.
*/
timeoutInMs: number | undefined;
}
/**
* Describes the components related to a specific task.
* @internal
*/
interface TaskDetails {
abortListener?: () => void;
abortSignal?: AbortSignalLike;
resolve: (value: unknown) => void;
reject: (reason: Error) => void;
task: (...args: any[]) => Promise<unknown>;
tid?: ReturnType<typeof setTimeout>;
}
/**
* CancellableAsyncLock provides a mechanism for forcing tasks using the same
* 'key' to be executed serially.
*
* Pending tasks can be manually cancelled via an abortSignal or automatically
* cancelled by reach a provided timeout value.
*/
export interface CancellableAsyncLock {
/**
* Returns a promise that resolves to the value returned by the provided task function.
* Only 1 task can be invoked at a time for a given `key` value.
*
* An acquire call can be cancelled via an `abortSignal`.
* If cancelled, the promise will be rejected with an `AbortError`.
*
* `acquireTimeoutInMs` can also be provided to properties.
* If the timeout is reached before the provided `task` is invoked,
* then the promise will be rejected with an Error stating the task
* timed out waiting to acquire a lock.
*
* @param key - All `acquire` calls are grouped by the provided `key`.
* @param task - The function to invoke once the lock has been acquired.
* @param properties - Additional properties to control the behavior of `acquire`.
*/
acquire<T = void>(
key: string,
task: (...args: any[]) => Promise<T>,
properties: AcquireLockProperties
): Promise<T>;
}
/**
* This class is used to coordinate executing tasks that should not be run in parallel.
* @internal
*/
export class CancellableAsyncLockImpl {
private _keyMap = new Map<string, TaskDetails[]>();
private _executionRunningSet = new Set<string>();
/**
* Returns a promise that resolves to the value returned by the provided task function.
* Only 1 task can be invoked at a time for a given `key` value.
*
* An acquire call can be cancelled via an `abortSignal`.
* If cancelled, the promise will be rejected with an `AbortError`.
*
* `acquireTimeoutInMs` can also be provided to properties.
* If the timeout is reached before the provided `task` is invoked,
* then the promise will be rejected with an Error stating the task
* timed out waiting to acquire a lock.
*
* @param key - All `acquire` calls are grouped by the provided `key`.
* @param task - The function to invoke once the lock has been acquired.
* @param properties - Additional properties to control the behavior of `acquire`.
*/
acquire<T = void>(
key: string,
task: (...args: any[]) => Promise<T>,
properties: AcquireLockProperties
): Promise<T> {
const { abortSignal, timeoutInMs } = properties;
// Fast exit if the operation is already cancelled.
if (abortSignal?.aborted) {
return Promise.reject(new AbortError(StandardAbortMessage));
}
// Ensure we've got a task queue for the given key.
const taskQueue = this._keyMap.get(key) ?? [];
this._keyMap.set(key, taskQueue);
// This method will return a promise that will be fulfilled outside this function.
const { promise, rejecter, resolver } = getPromiseParts();
const taskDetails: TaskDetails = {
reject: rejecter,
resolve: resolver,
task
};
// Handle timeouts by removing the task from the queue when hit.
if (typeof timeoutInMs === "number") {
const tid = setTimeout(() => {
this._removeTaskDetails(key, taskDetails);
rejecter(
new OperationTimeoutError(`The task timed out waiting to acquire a lock for ${key}`)
);
}, timeoutInMs);
taskDetails.tid = tid;
}
// Handle cancellation by removing the task from the queue when cancelled.
if (abortSignal) {
const abortListener = (): void => {
this._removeTaskDetails(key, taskDetails);
rejecter(new AbortError(StandardAbortMessage));
};
abortSignal.addEventListener("abort", abortListener);
taskDetails.abortSignal = abortSignal;
taskDetails.abortListener = abortListener;
}
// Enqueue the task!
taskQueue.push(taskDetails);
logger.verbose(
`Called acquire() for lock "${key}". Lock "${key}" has ${taskQueue.length} pending tasks.`
);
// Start a loop to iterate over the task queue.
// This will run asynchronously and won't allow
// more than 1 concurrent execution per key at a time.
this._execute(key);
return promise as Promise<T>;
}
/**
* Iterates over all the pending tasks for a given `key` serially.
*
* Note: If the pending tasks are already being iterated by an early
* _execute invocation, this returns immediately.
* @returns
*/
private async _execute(key: string): Promise<void> {
// If the key already exists in the set, then exit because
// tasks are already being processed.
if (this._executionRunningSet.has(key)) {
return;
}
const taskQueue = this._keyMap.get(key);
// If the queue is empty, exit early!
if (!taskQueue || !taskQueue.length) {
return;
}
// Add the key to the set so we can tell the
// task queue is already being processed.
this._executionRunningSet.add(key);
while (taskQueue.length) {
// Remove tasks from the front of the queue.
// Order matters!
const taskDetails = taskQueue.shift();
if (!taskDetails) {
continue;
}
try {
logger.verbose(`Acquired lock for "${key}", invoking task.`);
cleanupTaskDetails(taskDetails);
const value = await taskDetails.task();
taskDetails.resolve(value);
} catch (err) {
taskDetails.reject(err);
}
logger.verbose(
`Task completed for lock "${key}". Lock "${key}" has ${taskQueue.length} pending tasks.`
);
}
// Indicate that the task queue for the key is empty
// and we're done processing it.
this._executionRunningSet.delete(key);
}
private _removeTaskDetails(key: string, taskDetails: TaskDetails): void {
const taskQueue = this._keyMap.get(key);
if (!taskQueue || !taskQueue.length) {
// The task is already gone from the queue, so our work here is done!
return;
}
const index = taskQueue.indexOf(taskDetails);
if (index !== -1) {
const [details] = taskQueue.splice(index, 1);
// Cleanup the task rejection code paths.
cleanupTaskDetails(details);
}
}
}
/**
* @internal
* Returns a promise and the promise's resolve and reject methods.
*/
function getPromiseParts(): {
promise: Promise<unknown>;
resolver: (value: unknown) => void;
rejecter: (reason: Error) => void;
} {
let resolver: (value: unknown) => void;
let rejecter: (reason: Error) => void;
const promise = new Promise<unknown>((resolve, reject) => {
resolver = resolve;
rejecter = reject;
});
return {
promise,
resolver: resolver!,
rejecter: rejecter!
};
}
/**
* @internal
* Removes any abort listener or pending timeout from a task.
*/
function cleanupTaskDetails(taskDetails: TaskDetails): void {
// Cleanup the task rejection code paths.
if (taskDetails.tid) clearTimeout(taskDetails.tid);
if (taskDetails.abortSignal && taskDetails.abortListener) {
taskDetails.abortSignal.removeEventListener("abort", taskDetails.abortListener);
}
}

Просмотреть файл

@ -6,6 +6,7 @@ import { AbortError, AbortSignalLike } from "@azure/abort-controller";
import { WebSocketImpl } from "rhea-promise";
import { isDefined } from "./typeGuards";
import { StandardAbortMessage } from "../errors";
import { CancellableAsyncLock, CancellableAsyncLockImpl } from "./lock";
export { AsyncLock };
/**
@ -128,6 +129,11 @@ export function getNewAsyncLock(options?: AsyncLockOptions): AsyncLock {
*/
export const defaultLock: AsyncLock = new AsyncLock({ maxPending: 10000 });
/**
* The cancellable async lock instance.
*/
export const defaultCancellableLock: CancellableAsyncLock = new CancellableAsyncLockImpl();
/**
* @internal
*

Просмотреть файл

@ -38,11 +38,13 @@ describe("CbsClient", function() {
// Make the existing `init` invocation wait until the abortSignal
// is aborted before acquiring it's lock.
await defaultLock.acquire(lock, (done) => {
setTimeout(() => {
controller.abort();
done();
}, 0);
await defaultLock.acquire(lock, () => {
return new Promise<void>((resolve) => {
setTimeout(() => {
controller.abort();
resolve();
}, 0);
});
});
try {

Просмотреть файл

@ -0,0 +1,383 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { AbortController, AbortError } from "@azure/abort-controller";
import * as chai from "chai";
import { OperationTimeoutError } from "rhea-promise";
import { delay } from "../src";
const should = chai.should();
import { CancellableAsyncLock, CancellableAsyncLockImpl } from "../src/util/lock";
import { settleAllTasks } from "./utils/utils";
describe("CancellableAsyncLock", function() {
const TEST_FAILURE = "Test failure";
describe(".acquire", function() {
let lock: CancellableAsyncLock;
beforeEach("create lock", () => {
lock = new CancellableAsyncLockImpl();
});
it("forwards values from task", async () => {
const expectedValues = ["foo", "bar", 3.14159, new Date(), {}, null];
const tasks: Promise<any>[] = [];
for (const val of expectedValues) {
tasks.push(
lock.acquire("lock", async () => val, {
timeoutInMs: undefined,
abortSignal: undefined
})
);
}
const results = await Promise.all(tasks);
results.should.deep.equal(expectedValues, "Unexpected value returned from tasks.");
});
it("forwards error from task", async () => {
try {
await lock.acquire(
"lock",
async () => {
throw new Error("I break things!");
},
{ timeoutInMs: undefined, abortSignal: undefined }
);
throw new Error(TEST_FAILURE);
} catch (err) {
should.equal(err.message, "I break things!");
}
});
it("works using single key", async () => {
const taskCount = 10;
const tasks: Promise<number>[] = [];
for (let i = 0; i < taskCount; i++) {
tasks.push(
lock.acquire(
"lock",
async () => {
// Add a delay such that later tasks would resolve
// faster than early tasks if they all ran at the
// same time.
await delay(taskCount - i);
return i;
},
{ timeoutInMs: undefined, abortSignal: undefined }
)
);
}
// verify order
for (let i = 0; i < taskCount; i++) {
const result = await Promise.race(tasks);
should.equal(result, i, "Tasks ran out of order.");
// Since tasks should be completed in order, remove head task.
tasks.shift();
}
should.equal(tasks.length, 0, "There are still tasks pending.");
});
it("keys are isolated", async () => {
/*
We enqueue 2 tasks on key "1", and 2 tasks on key "2".
Each task has a delay so that it yields control to the event loop
before resolving.
The 1st task on key "1" will resolve first.
The 1st task on key "2" will resolve second because it does not
need to wait for tasks on key "1" to resolve.
The 2nd task on key "1" will resolve third. It must wait for
the 1st tak on key "1" to resolve.
The 2nd task on key "2" will resolve last. It must wait for
the 1st task on key "2" to resolve.
*/
const tasks = [
lock.acquire(
"1",
async () => {
await delay(0);
return 0;
},
{ timeoutInMs: undefined, abortSignal: undefined }
),
lock.acquire(
"1",
async () => {
await delay(0);
return 2;
},
{ timeoutInMs: undefined, abortSignal: undefined }
),
lock.acquire(
"2",
async () => {
await delay(0);
return 1;
},
{ timeoutInMs: undefined, abortSignal: undefined }
),
lock.acquire(
"1",
async () => {
await delay(0);
return 3;
},
{ timeoutInMs: undefined, abortSignal: undefined }
)
];
const results: number[] = [];
const queue: Promise<number>[] = [];
for (const task of tasks) {
queue.push(task);
task
.then((value) => {
results.push(value);
queue.splice(queue.indexOf(task), 1);
return;
})
.catch(() => {
/* no-op */
});
}
while (queue.length) {
await Promise.race(queue);
}
results.should.deep.equal([0, 1, 2, 3], "Tasks completed out of order.");
});
it("supports timeouts", async () => {
const tasks = [
lock.acquire(
"lock",
async () => {
await delay(0);
return 0;
},
{ timeoutInMs: undefined, abortSignal: undefined }
),
lock.acquire(
"lock",
async () => {
await delay(0);
return 1;
},
{ timeoutInMs: undefined, abortSignal: undefined }
),
lock.acquire(
"lock",
async () => {
await delay(0);
return 2;
},
{ timeoutInMs: 0, abortSignal: undefined }
),
lock.acquire(
"lock",
async () => {
await delay(0);
return 3;
},
{ timeoutInMs: undefined, abortSignal: undefined }
),
lock.acquire(
"lock",
async () => {
await delay(0);
return 4;
},
{ timeoutInMs: 0, abortSignal: undefined }
)
];
const results = await settleAllTasks(tasks);
results.length.should.equal(5, "Unexpected number of tasks completed.");
const expectedResults = [0, 1, OperationTimeoutError, 3, OperationTimeoutError];
for (let i = 0; i < results.length; i++) {
const value = results[i];
const expectedResult = expectedResults[i];
if (typeof expectedResult === "number") {
should.equal(value, expectedResult, "Unexpected task value.");
} else {
should.equal(value instanceof expectedResult, true, "Unexpected task value.");
}
}
});
it("supports cancellation (already cancelled)", async () => {
const abortController = new AbortController();
abortController.abort();
const abortSignal = abortController.signal;
const tasks = [
lock.acquire(
"lock",
async () => {
await delay(0);
return 0;
},
{ timeoutInMs: undefined, abortSignal: undefined }
),
lock.acquire(
"lock",
async () => {
await delay(0);
return 1;
},
{ timeoutInMs: undefined, abortSignal: undefined }
),
lock.acquire(
"lock",
async () => {
await delay(0);
return 2;
},
{ abortSignal, timeoutInMs: undefined }
),
lock.acquire(
"lock",
async () => {
await delay(0);
return 3;
},
{ timeoutInMs: undefined, abortSignal: undefined }
),
lock.acquire(
"lock",
async () => {
await delay(0);
return 4;
},
{ abortSignal, timeoutInMs: undefined }
)
];
const results: any[] = [];
for (const task of tasks) {
task
.then((value) => {
results.push(value);
tasks.splice(tasks.indexOf(task), 1);
return;
})
.catch((err) => {
results.push(err);
tasks.splice(tasks.indexOf(task), 1);
});
}
while (tasks.length) {
try {
await Promise.race(tasks);
} catch (err) {
/* no-op */
}
}
tasks.length.should.equal(0, "Queue of tasks not empty.");
results.length.should.equal(5, "Unexpected number of tasks completed.");
const expectedResults = [AbortError, AbortError, 0, 1, 3];
for (let i = 0; i < results.length; i++) {
const value = results[i];
const expectedResult = expectedResults[i];
if (typeof expectedResult === "number") {
should.equal(value, expectedResult, "Unexpected task value.");
} else {
should.equal(value.name, expectedResult.name, "Unexpected task value.");
}
}
});
it("supports cancellation", async () => {
const abortController = new AbortController();
setTimeout(() => abortController.abort(), 0);
const abortSignal = abortController.signal;
const tasks = [
lock.acquire(
"lock",
async () => {
await delay(0);
return 0;
},
{ timeoutInMs: undefined, abortSignal: undefined }
),
lock.acquire(
"lock",
async () => {
await delay(0);
return 1;
},
{ timeoutInMs: undefined, abortSignal: undefined }
),
lock.acquire(
"lock",
async () => {
await delay(0);
return 2;
},
{ abortSignal, timeoutInMs: undefined }
),
lock.acquire(
"lock",
async () => {
await delay(0);
return 3;
},
{ timeoutInMs: undefined, abortSignal: undefined }
),
lock.acquire(
"lock",
async () => {
await delay(0);
return 4;
},
{ abortSignal, timeoutInMs: undefined }
)
];
const results: any[] = [];
for (const task of tasks) {
task
.then((value) => {
results.push(value);
tasks.splice(tasks.indexOf(task), 1);
return;
})
.catch((err) => {
results.push(err);
tasks.splice(tasks.indexOf(task), 1);
});
}
while (tasks.length) {
try {
await Promise.race(tasks);
} catch (err) {
/* no-op */
}
}
tasks.length.should.equal(0, "Queue of tasks not empty.");
results.length.should.equal(5, "Unexpected number of tasks completed.");
const expectedResults = [AbortError, AbortError, 0, 1, 3];
for (let i = 0; i < results.length; i++) {
const value = results[i];
const expectedResult = expectedResults[i];
if (typeof expectedResult === "number") {
should.equal(value, expectedResult, "Unexpected task value.");
} else {
should.equal(value.name, expectedResult.name, "Unexpected task value.");
}
}
});
});
});

Просмотреть файл

@ -0,0 +1,25 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
/**
* Similar to Promise.allSettled which isn't available in all of our supported environments.
* https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/allSettled
*/
export async function settleAllTasks(tasks: Promise<any>[]): Promise<any[]> {
// add no-op catch to each task to prevent unhandled promise rejection in node 15+
for (const task of tasks) {
task.catch(() => {
/* no-op */
});
}
const results = [];
for (const task of tasks) {
try {
const result = await task;
results.push(result);
} catch (err) {
results.push(err);
}
}
return results;
}

Просмотреть файл

@ -99,7 +99,7 @@
"is-buffer": "^2.0.3",
"jssha": "^3.1.0",
"process": "^0.11.10",
"rhea-promise": "^1.2.0",
"rhea-promise": "^1.2.1",
"tslib": "^2.0.0",
"uuid": "^8.3.0"
},

Просмотреть файл

@ -27,6 +27,7 @@ import { LinkEntity } from "./linkEntity";
import { EventPosition, getEventPositionFilter } from "./eventPosition";
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
import { defaultDataTransformer } from "./dataTransformer";
import { getRetryAttemptTimeoutInMs } from "./util/retries";
/**
* @hidden
@ -452,10 +453,10 @@ export class EventHubReceiver extends LinkEntity {
if (!this.isOpen()) {
try {
await this.initialize();
if (abortSignal && abortSignal.aborted) {
await this.abort();
}
await this.initialize({
abortSignal,
timeoutInMs: getRetryAttemptTimeoutInMs(this.options.retryOptions)
});
} catch (err) {
if (this._onError === onError) {
onError(err);
@ -541,14 +542,20 @@ export class EventHubReceiver extends LinkEntity {
* Creates a new AMQP receiver under a new AMQP session.
* @hidden
*/
async initialize(): Promise<void> {
async initialize({
abortSignal,
timeoutInMs
}: {
abortSignal: AbortSignalLike | undefined;
timeoutInMs: number;
}): Promise<void> {
try {
if (!this.isOpen() && !this.isConnecting) {
this.isConnecting = true;
// Wait for the connectionContext to be ready to open the link.
await this._context.readyToOpenLink();
await this._negotiateClaim();
await this._negotiateClaim({ setTokenRenewal: false, abortSignal, timeoutInMs });
const receiverOptions: CreateReceiverOptions = {
onClose: (context: EventContext) => this._onAmqpClose(context),
@ -568,7 +575,7 @@ export class EventHubReceiver extends LinkEntity {
this.name,
options
);
this._receiver = await this._context.connection.createReceiver(options);
this._receiver = await this._context.connection.createReceiver({ ...options, abortSignal });
this.isConnecting = false;
logger.verbose(
"[%s] Receiver '%s' created with receiver options: %O",
@ -794,14 +801,29 @@ export class EventHubReceiver extends LinkEntity {
};
const retryOptions = this.options.retryOptions || {};
const config: RetryConfig<ReceivedEventData[]> = {
connectionHost: this._context.config.host,
connectionId: this._context.connectionId,
operation: retrieveEvents,
operationType: RetryOperationType.receiveMessage,
abortSignal: abortSignal,
retryOptions: retryOptions
};
const config: RetryConfig<ReceivedEventData[]> = Object.defineProperties(
{
operation: retrieveEvents,
operationType: RetryOperationType.receiveMessage,
abortSignal: abortSignal,
retryOptions: retryOptions
},
{
connectionId: {
enumerable: true,
get: () => {
return this._context.connectionId;
}
},
connectionHost: {
enumerable: true,
get: () => {
return this._context.config.host;
}
}
}
);
return retry<ReceivedEventData[]>(config);
}
}

Просмотреть файл

@ -18,7 +18,7 @@ import {
RetryConfig,
RetryOperationType,
RetryOptions,
defaultLock,
defaultCancellableLock,
retry,
translate
} from "@azure/core-amqp";
@ -27,12 +27,11 @@ import { ConnectionContext } from "./connectionContext";
import { LinkEntity } from "./linkEntity";
import { EventHubProducerOptions } from "./models/private";
import { SendOptions } from "./models/public";
import { getRetryAttemptTimeoutInMs } from "./util/retries";
import { AbortSignalLike } from "@azure/abort-controller";
import { EventDataBatch, isEventDataBatch } from "./eventDataBatch";
import { defaultDataTransformer } from "./dataTransformer";
import { waitForTimeoutOrAbortOrResolve } from "./util/timeoutAbortSignalUtils";
/**
* Describes the EventHubSender that will send event data to EventHub.
* @internal
@ -459,20 +458,21 @@ export class EventHubSender extends LinkEntity {
retryOptions.timeoutInMs = timeoutInMs;
const senderOptions = this._createSenderOptions(timeoutInMs);
const startTime = Date.now();
const createLinkPromise = async (): Promise<void> => {
return waitForTimeoutOrAbortOrResolve({
actionFn: () => {
return defaultLock.acquire(this.senderLock, () => {
return this._init(senderOptions);
return defaultCancellableLock.acquire(
this.senderLock,
() => {
const taskStartTime = Date.now();
const taskTimeoutInMs = timeoutInMs - (taskStartTime - startTime);
return this._init({
...senderOptions,
abortSignal: options.abortSignal,
timeoutInMs: taskTimeoutInMs
});
},
abortSignal: options?.abortSignal,
timeoutMs: timeoutInMs,
timeoutMessage:
`[${this._context.connectionId}] Sender "${this.name}" ` +
`with address "${this.address}", cannot be created right now, due ` +
`to operation timeout.`
});
{ abortSignal: options.abortSignal, timeoutInMs: timeoutInMs }
);
};
const config: RetryConfig<void> = {
@ -502,14 +502,23 @@ export class EventHubSender extends LinkEntity {
* Initializes the sender session on the connection.
* @hidden
*/
private async _init(options: AwaitableSenderOptions): Promise<void> {
private async _init(
options: AwaitableSenderOptions & {
abortSignal: AbortSignalLike | undefined;
timeoutInMs: number;
}
): Promise<void> {
try {
if (!this.isOpen() && !this.isConnecting) {
this.isConnecting = true;
// Wait for the connectionContext to be ready to open the link.
await this._context.readyToOpenLink();
await this._negotiateClaim();
await this._negotiateClaim({
setTokenRenewal: false,
abortSignal: options.abortSignal,
timeoutInMs: options.timeoutInMs
});
logger.verbose(
"[%s] Trying to create sender '%s'...",

Просмотреть файл

@ -2,11 +2,13 @@
// Licensed under the MIT license.
import { v4 as uuid } from "uuid";
import { Constants, TokenType, defaultLock, isSasTokenProvider } from "@azure/core-amqp";
import { Constants, TokenType, defaultCancellableLock, isSasTokenProvider } from "@azure/core-amqp";
import { AccessToken } from "@azure/core-auth";
import { ConnectionContext } from "./connectionContext";
import { AwaitableSender, Receiver } from "rhea-promise";
import { logger } from "./log";
import { getRetryAttemptTimeoutInMs } from "./util/retries";
import { AbortSignalLike } from "@azure/abort-controller";
/**
* @hidden
@ -109,10 +111,17 @@ export class LinkEntity {
/**
* Negotiates cbs claim for the LinkEntity.
* @hidden
* @param setTokenRenewal - Set the token renewal timer. Default false.
* @returns Promise<void>
*/
protected async _negotiateClaim(setTokenRenewal?: boolean): Promise<void> {
protected async _negotiateClaim({
abortSignal,
setTokenRenewal,
timeoutInMs
}: {
setTokenRenewal: boolean | undefined;
abortSignal: AbortSignalLike | undefined;
timeoutInMs: number;
}): Promise<void> {
// Acquire the lock and establish a cbs session if it does not exist on the connection.
// Although node.js is single threaded, we need a locking mechanism to ensure that a
// race condition does not happen while creating a shared resource (in this case the
@ -126,9 +135,22 @@ export class LinkEntity {
this.name,
this.address
);
await defaultLock.acquire(this._context.cbsSession.cbsLock, () => {
return this._context.cbsSession.init();
});
const startTime = Date.now();
if (!this._context.cbsSession.isOpen()) {
await defaultCancellableLock.acquire(
this._context.cbsSession.cbsLock,
() => {
return this._context.cbsSession.init({
abortSignal,
timeoutInMs: timeoutInMs - (Date.now() - startTime)
});
},
{
abortSignal,
timeoutInMs
}
);
}
let tokenObject: AccessToken;
let tokenType: TokenType;
if (isSasTokenProvider(this._context.tokenCredential)) {
@ -162,9 +184,21 @@ export class LinkEntity {
this.name,
this.address
);
await defaultLock.acquire(this._context.negotiateClaimLock, () => {
return this._context.cbsSession.negotiateClaim(this.audience, tokenObject.token, tokenType);
});
await defaultCancellableLock.acquire(
this._context.negotiateClaimLock,
() => {
return this._context.cbsSession.negotiateClaim(
this.audience,
tokenObject.token,
tokenType,
{ abortSignal, timeoutInMs: timeoutInMs - (Date.now() - startTime) }
);
},
{
abortSignal,
timeoutInMs: timeoutInMs - (Date.now() - startTime)
}
);
logger.verbose(
"[%s] Negotiated claim for %s '%s' with with address: %s",
this._context.connectionId,
@ -193,7 +227,11 @@ export class LinkEntity {
}
this._tokenRenewalTimer = setTimeout(async () => {
try {
await this._negotiateClaim(true);
await this._negotiateClaim({
setTokenRenewal: true,
abortSignal: undefined,
timeoutInMs: getRetryAttemptTimeoutInMs(undefined)
});
} catch (err) {
logger.verbose(
"[%s] %s '%s' with address %s, an error occurred while renewing the token: %O",

Просмотреть файл

@ -9,7 +9,7 @@ import {
RetryOperationType,
RetryOptions,
SendRequestOptions,
defaultLock,
defaultCancellableLock,
isSasTokenProvider,
retry,
translate
@ -33,7 +33,6 @@ import { throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing } from "
import { SpanStatusCode } from "@azure/core-tracing";
import { OperationOptions } from "./util/operationOptions";
import { createEventHubSpan } from "./diagnostics/tracing";
import { waitForTimeoutOrAbortOrResolve } from "./util/timeoutAbortSignalUtils";
/**
* Describes the runtime information of an Event Hub.
@ -307,12 +306,18 @@ export class ManagementClient extends LinkEntity {
}
}
private async _init(): Promise<void> {
private async _init({
abortSignal,
timeoutInMs
}: {
abortSignal: AbortSignalLike | undefined;
timeoutInMs: number;
}): Promise<void> {
try {
if (!this._isMgmtRequestResponseLinkOpen()) {
// Wait for the connectionContext to be ready to open the link.
await this._context.readyToOpenLink();
await this._negotiateClaim();
await this._negotiateClaim({ setTokenRenewal: false, abortSignal, timeoutInMs });
const rxopt: ReceiverOptions = {
source: { address: this.address },
name: this.replyTo,
@ -341,7 +346,8 @@ export class ManagementClient extends LinkEntity {
this._mgmtReqResLink = await RequestResponseLink.create(
this._context.connection,
sropt,
rxopt
rxopt,
{ abortSignal }
);
this._mgmtReqResLink.sender.on(SenderEvents.senderError, (context: EventContext) => {
const id = context.connection.options.id;
@ -405,18 +411,17 @@ export class ManagementClient extends LinkEntity {
);
const initOperationStartTime = Date.now();
try {
await waitForTimeoutOrAbortOrResolve({
actionFn: () => {
return defaultLock.acquire(this.managementLock, () => {
return this._init();
});
await defaultCancellableLock.acquire(
this.managementLock,
() => {
const acquireLockEndTime = Date.now();
const timeoutInMs =
retryTimeoutInMs - (acquireLockEndTime - initOperationStartTime);
return this._init({ abortSignal, timeoutInMs });
},
abortSignal: options?.abortSignal,
timeoutMs: retryTimeoutInMs,
timeoutMessage: `The request with message_id "${request.message_id}" timed out. Please try again later.`
});
{ abortSignal, timeoutInMs: retryTimeoutInMs }
);
} catch (err) {
const translatedError = translate(err);
logger.warning(
@ -451,13 +456,22 @@ export class ManagementClient extends LinkEntity {
return this._mgmtReqResLink!.sendRequest(request, sendRequestOptions);
};
const config: RetryConfig<Message> = {
operation: sendOperationPromise,
connectionId: this._context.connectionId,
operationType: RetryOperationType.management,
abortSignal: abortSignal,
retryOptions: retryOptions
};
const config: RetryConfig<Message> = Object.defineProperties(
{
operation: sendOperationPromise,
operationType: RetryOperationType.management,
abortSignal: abortSignal,
retryOptions: retryOptions
},
{
connectionId: {
enumerable: true,
get: () => {
return this._context.connectionId;
}
}
}
);
return (await retry<Message>(config)).body;
} catch (err) {
const translatedError = translate(err);

Просмотреть файл

@ -9,7 +9,6 @@ import debugModule from "debug";
const debug = debugModule("azure:event-hubs:receiver-spec");
import {
EventData,
MessagingError,
ReceivedEventData,
latestEventPosition,
earliestEventPosition,
@ -484,51 +483,4 @@ describe("EventHubConsumerClient", function(): void {
await subscription!.close();
});
});
describe("Negative scenarios", function(): void {
it("should throw MessagingEntityNotFoundError for non existing consumer group", async function(): Promise<
void
> {
const badConsumerClient = new EventHubConsumerClient(
"boo",
service.connectionString,
service.path
);
let subscription: Subscription | undefined;
const caughtErr = await new Promise<Error | MessagingError>((resolve) => {
subscription = badConsumerClient.subscribe({
processEvents: async () => {
/* no-op */
},
processError: async (err) => {
resolve(err);
}
});
});
await subscription!.close();
await badConsumerClient.close();
should.exist(caughtErr);
should.equal((caughtErr as MessagingError).code, "MessagingEntityNotFoundError");
});
it(`should throw an invalid EventHub address error for invalid partition`, async function(): Promise<
void
> {
let subscription: Subscription | undefined;
const caughtErr = await new Promise<Error | MessagingError>((resolve) => {
subscription = consumerClient.subscribe("boo", {
processEvents: async () => {
/* no-op */
},
processError: async (err) => {
resolve(err);
}
});
});
await subscription!.close();
should.exist(caughtErr);
should.equal((caughtErr as MessagingError).code, "ArgumentOutOfRangeError");
});
});
}).timeout(90000);

Просмотреть файл

@ -111,7 +111,7 @@
"long": "^4.0.0",
"process": "^0.11.10",
"tslib": "^2.0.0",
"rhea-promise": "^1.2.0"
"rhea-promise": "^1.2.1"
},
"devDependencies": {
"@azure/dev-tool": "^1.0.0",