[service-bus] Allow initLink() and closeLink() to lock, preventing reentrancy/concurrency issues (#10807)

initLink and closeLink had some basic coordination so they wouldn't stomp over each other but they never coordinated via locks which made it so the state of the object would be different than you expected.

This commit makes it so both closeLink and openLink depend on the same lock, which allows you to make the modifications to the link state obey ordering. This simplifies some parts of the code that tried to avoid opening and closing simultaneously.

As part of this the lock token that was being used in other parts of the code has been subsumed into LinkEntity instead so some modification to the detached/init methods of child classes changed.

Also, removed some code in ManagementLink that has become redundant with our unified initLink() code (abort signal handling and some link state checking).

Fixes #10656
This commit is contained in:
Richard Park 2020-08-28 16:25:47 -07:00 коммит произвёл GitHub
Родитель ed7b03cab2
Коммит 5c4a952dff
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
14 изменённых файлов: 573 добавлений и 570 удалений

Просмотреть файл

@ -73,6 +73,12 @@ export interface ConnectionContext extends ConnectionContextBase {
* Creates one if none exists in the cache
*/
getManagementClient(entityPath: string): ManagementClient;
/**
* Indicates whether the connection is in the process of closing.
* When this returns `true`, a `disconnected` event will be received
* after the connection is closed.
*/
isConnectionClosing(): boolean;
}
/**
@ -82,13 +88,6 @@ export interface ConnectionContext extends ConnectionContextBase {
* @internal
*/
export interface ConnectionContextInternalMembers extends ConnectionContext {
/**
* Indicates whether the connection is in the process of closing.
* When this returns `true`, a `disconnected` event will be received
* after the connection is closed.
*
*/
isConnectionClosing(): boolean;
/**
* Resolves once the context's connection emits a `disconnected` event.
*/
@ -173,6 +172,7 @@ export namespace ConnectionContext {
// This can happen when the idle timeout has been reached but
// the underlying socket is waiting to be destroyed.
if (this.isConnectionClosing()) {
log.error(`[${this.connectionId}] Connection is closing, waiting for disconnected event`);
// Wait for the disconnected event that indicates the underlying socket has closed.
await this.waitForDisconnectedEvent();
}
@ -194,8 +194,13 @@ export namespace ConnectionContext {
waitForConnectionReset() {
// Check if the connection is currently in the process of disconnecting.
if (waitForConnectionRefreshPromise) {
log.error(`[${this.connectionId}] Waiting for connection reset`);
return waitForConnectionRefreshPromise;
}
log.error(
`[${this.connectionId}] Connection not waiting to be reset. Resolving immediately.`
);
return Promise.resolve();
},
getReceiverFromCache(
@ -256,6 +261,7 @@ export namespace ConnectionContext {
if (waitForConnectionRefreshPromise) {
return;
}
waitForConnectionRefreshPromise = new Promise((resolve) => {
waitForConnectionRefreshResolve = resolve;
});
@ -320,7 +326,7 @@ export namespace ConnectionContext {
// Call onDetached() on sender so that it can gracefully shutdown
for (const senderName of Object.keys(connectionContext.senders)) {
const sender = connectionContext.senders[senderName];
if (sender && !sender.isConnecting) {
if (sender) {
log.error(
"[%s] calling detached on sender '%s'.",
connectionContext.connection.id,
@ -343,7 +349,7 @@ export namespace ConnectionContext {
// and streaming receivers can decide whether to reconnect or not.
for (const receiverName of Object.keys(connectionContext.messageReceivers)) {
const receiver = connectionContext.messageReceivers[receiverName];
if (receiver && !receiver.isConnecting) {
if (receiver) {
log.error(
"[%s] calling detached on %s receiver '%s'.",
connectionContext.connection.id,

Просмотреть файл

@ -7,13 +7,15 @@ import {
SharedKeyCredential,
TokenType,
defaultLock,
RequestResponseLink
RequestResponseLink,
MessagingError
} from "@azure/core-amqp";
import { ConnectionContext } from "../connectionContext";
import * as log from "../log";
import {
AwaitableSender,
AwaitableSenderOptions,
generate_uuid,
Receiver,
ReceiverOptions,
SenderOptions
@ -154,6 +156,10 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
*/
private _logPrefix: string;
protected get logPrefix(): string {
return this._logPrefix;
}
private _logger: typeof log.error;
/**
@ -162,7 +168,11 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
*/
private _wasClosedPermanently: boolean = false;
private _isConnecting: boolean = false;
/**
* A lock that ensures that opening and closing this
* link properly cooperate.
*/
private _openLock: string = generate_uuid();
/**
* Creates a new ClientEntity instance.
@ -181,7 +191,7 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
this.address = options.address || "";
this.audience = options.audience || "";
this.name = getUniqueName(name);
this._logPrefix = `[${context.connectionId}|${this._linkType}:${this.name}|a:${this.address}]`;
this._logPrefix = `[${context.connectionId}|${this._linkType}:${this.name}]`;
this._logger = LinkEntity.getLogger(this._linkType);
}
@ -196,13 +206,6 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
return result;
}
/**
* Indicates that a link initialization is in process.
*/
get isConnecting(): boolean {
return this._isConnecting;
}
/**
* Initializes this LinkEntity, setting this._link with the result of `createRheaLink`, which
* is implemented by child classes.
@ -210,6 +213,22 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
* @returns A Promise that resolves when the link has been properly initialized
*/
async initLink(options: LinkOptionsT<LinkT>, abortSignal?: AbortSignalLike): Promise<void> {
// we'll check that the connection isn't in the process of recycling (and if so, wait for it to complete)
await this._context.readyToOpenLink();
log.error(
`${this._logPrefix} Attempting to acquire lock token ${this._openLock} for initializing link`
);
return defaultLock.acquire(this._openLock, () => {
log.error(`${this._logPrefix} Lock ${this._openLock} acquired for initializing link`);
return this._initLinkImpl(options, abortSignal);
});
}
private async _initLinkImpl(
options: LinkOptionsT<LinkT>,
abortSignal?: AbortSignalLike
): Promise<void> {
const checkAborted = (): void => {
if (abortSignal?.aborted) {
throw new AbortError(StandardAbortMessage);
@ -221,12 +240,12 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
if (options.name) {
this.name = options.name;
this._logPrefix = `[${connectionId}|${this._linkType}:${this.name}|a:${this.address}]`;
this._logPrefix = `[${connectionId}|${this._linkType}:${this.name}]`;
}
if (this._wasClosedPermanently) {
log.error(`${this._logPrefix} Link has been closed. Not reopening.`);
return;
log.error(`${this._logPrefix} Link has been permanently closed. Not reopening.`);
throw new AbortError(`Link has been permanently closed. Not reopening.`);
}
if (this.isOpen()) {
@ -234,38 +253,25 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
return;
}
if (this.isConnecting) {
log.error(`${this._logPrefix} Link is currently opening. Returning.`);
return;
}
log.error(`${this._logPrefix} Is not open and is not currently connecting. Opening.`);
this._isConnecting = true;
try {
await this._negotiateClaim();
checkAborted();
this.checkIfConnectionReady();
this._logger(`${this._logPrefix} Creating with options %O`, options);
this._link = await this.createRheaLink(options);
checkAborted();
if (this._wasClosedPermanently) {
// the user attempted to close while we were still initializing the link. Abort
// the current operation. This also makes it so the operation is non-retryable.
log.error(`${this._logPrefix} Link closed while it was initializing.`);
throw new AbortError("Link closed while initializing.");
}
this._ensureTokenRenewal();
this._logger(`${this._logPrefix} Link has been created.`);
} catch (err) {
await this.closeLink();
log.error(`${this._logPrefix} Error thrown when creating the link:`, err);
await this.closeLinkImpl();
throw err;
} finally {
this._isConnecting = false;
}
}
@ -324,8 +330,18 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
* Closes the internally held rhea link, stops the token renewal timer and sets
* the this._link field to undefined.
*/
protected async closeLink(): Promise<void> {
this._logger(`${this._logPrefix} closeLink() called`);
protected closeLink(): Promise<void> {
log.error(
`${this._logPrefix} Attempting to acquire lock token ${this._openLock} for closing link`
);
return defaultLock.acquire(this._openLock, () => {
log.error(`${this._logPrefix} Lock ${this._openLock} acquired for closing link`);
return this.closeLinkImpl();
});
}
private async closeLinkImpl(): Promise<void> {
this._logger(`${this._logPrefix} closeLinkImpl() called`);
clearTimeout(this._tokenRenewalTimer as NodeJS.Timer);
this._tokenRenewalTimer = undefined;
@ -394,8 +410,11 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
* @return {Promise<void>} Promise<void>
*/
private async _negotiateClaim(setTokenRenewal?: boolean): Promise<void> {
log.error(`${this._logPrefix} negotiateclaim() has been called`);
// Wait for the connectionContext to be ready to open the link.
await this._context.readyToOpenLink();
this.checkIfConnectionReady();
// Acquire the lock and establish a cbs session if it does not exist on the connection.
// Although node.js is single threaded, we need a locking mechanism to ensure that a
// race condition does not happen while creating a shared resource (in this case the
@ -409,7 +428,8 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
this.name,
this.address
);
await defaultLock.acquire(this._context.cbsSession.cbsLock, () => {
await defaultLock.acquire(this._context.cbsSession.cbsLock, async () => {
this.checkIfConnectionReady();
return this._context.cbsSession.init();
});
let tokenObject: AccessToken;
@ -447,6 +467,7 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
throw new Error("Token cannot be null");
}
await defaultLock.acquire(this._context.negotiateClaimLock, () => {
this.checkIfConnectionReady();
return this._context.cbsSession.negotiateClaim(this.audience, tokenObject, tokenType);
});
this._logger(
@ -461,6 +482,22 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
}
}
/**
* Checks to see if the connection is in a "reopening" state. If it is
* we need to _not_ use it otherwise we'll trigger some race conditions
* within rhea (for instance, errors about _process not being defined).
*/
private checkIfConnectionReady() {
if (!this._context.isConnectionClosing()) {
return;
}
log.error(`${this._logPrefix} Connection is reopening, aborting link initialization.`);
const err = new MessagingError("Connection is reopening, aborting link initialization.");
err.retryable = true;
throw err;
}
/**
* Ensures that the token is renewed within the predefined renewal margin.
* @returns {void}

Просмотреть файл

@ -20,7 +20,6 @@ import {
MessagingError,
RequestResponseLink,
SendRequestOptions,
defaultLock,
translate
} from "@azure/core-amqp";
import { ConnectionContext } from "../connectionContext";
@ -47,7 +46,7 @@ import { Typed } from "rhea-promise";
import { max32BitNumber } from "../util/constants";
import { Buffer } from "buffer";
import { OperationOptionsBase } from "./../modelsToBeSharedWithEventHubs";
import { AbortError } from "@azure/abort-controller";
import { AbortSignalLike } from "@azure/abort-controller";
/**
* @internal
@ -189,7 +188,6 @@ export interface ManagementClientOptions {
* to the $management endpoint over AMQP connection.
*/
export class ManagementClient extends LinkEntity<RequestResponseLink> {
readonly managementLock: string = `${Constants.managementRequestKey}-${generate_uuid()}`;
/**
* @property {string} entityPath - The name/path of the entity (queue/topic/subscription name)
* for which the management request needs to be made.
@ -223,7 +221,7 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
this.entityPath = entityPath;
}
private async _init(): Promise<void> {
private async _init(abortSignal?: AbortSignalLike): Promise<void> {
throwErrorIfConnectionClosed(this._context);
try {
const rxopt: ReceiverOptions = {
@ -243,10 +241,13 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
};
const sropt: SenderOptions = { target: { address: this.address } };
await this.initLink({
senderOptions: sropt,
receiverOptions: rxopt
});
await this.initLink(
{
senderOptions: sropt,
receiverOptions: rxopt
},
abortSignal
);
this.link!.sender.on(SenderEvents.senderError, (context: EventContext) => {
const id = context.connection.options.id;
@ -293,55 +294,26 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
const retryTimeoutInMs =
sendRequestOptions.timeoutInMs ?? Constants.defaultOperationTimeoutInMs;
const initOperationStartTime = Date.now();
if (!this.isOpen()) {
const rejectOnAbort = () => {
const requestName = sendRequestOptions.requestName;
const desc: string =
`[${this._context.connectionId}] The request "${requestName}" ` +
`to has been cancelled by the user.`;
log.error(desc);
const error = new AbortError(
`The ${requestName ? requestName + " " : ""}operation has been cancelled by the user.`
);
throw error;
const actionAfterTimeout = () => {
const desc: string = `The request with message_id "${request.message_id}" timed out. Please try again later.`;
const e: Error = {
name: "OperationTimeoutError",
message: desc
};
if (sendRequestOptions.abortSignal) {
if (sendRequestOptions.abortSignal.aborted) {
return rejectOnAbort();
}
// TODO: init() should respect the abort signal as well.
// See https://github.com/Azure/azure-sdk-for-js/issues/4422
}
throw e;
};
const actionAfterTimeout = () => {
const desc: string = `The request with message_id "${request.message_id}" timed out. Please try again later.`;
const e: Error = {
name: "OperationTimeoutError",
message: desc
};
const waitTimer = setTimeout(actionAfterTimeout, retryTimeoutInMs);
throw e;
};
log.mgmt("[%s] Acquiring lock to get the management req res link.", this._context.connectionId);
const waitTimer = setTimeout(actionAfterTimeout, retryTimeoutInMs);
log.mgmt(
"[%s] Acquiring lock to get the management req res link.",
this._context.connectionId
);
try {
await defaultLock.acquire(this.managementLock, () => {
return this._init();
});
} catch (err) {
throw err;
} finally {
clearTimeout(waitTimer);
}
try {
await this._init(sendRequestOptions?.abortSignal);
} finally {
clearTimeout(waitTimer);
}
// time taken by the init operation
const timeTakenByInit = Date.now() - initOperationStartTime;
// Left over time

Просмотреть файл

@ -9,7 +9,6 @@ import {
EventContext,
OnAmqpEvent,
message as RheaMessageUtil,
generate_uuid,
messageProperties
} from "rhea-promise";
import {
@ -20,7 +19,6 @@ import {
RetryConfig,
RetryOperationType,
RetryOptions,
defaultLock,
delay,
retry,
translate
@ -32,12 +30,12 @@ import {
} from "../serviceBusMessage";
import { ConnectionContext } from "../connectionContext";
import { LinkEntity } from "./linkEntity";
import { getUniqueName, waitForTimeoutOrAbortOrResolve, StandardAbortMessage } from "../util/utils";
import { getUniqueName, waitForTimeoutOrAbortOrResolve } from "../util/utils";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { ServiceBusMessageBatch, ServiceBusMessageBatchImpl } from "../serviceBusMessageBatch";
import { CreateBatchOptions } from "../models";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
import { AbortSignalLike } from "@azure/abort-controller";
/**
* @internal
@ -46,12 +44,6 @@ import { AbortError, AbortSignalLike } from "@azure/abort-controller";
* @class MessageSender
*/
export class MessageSender extends LinkEntity<AwaitableSender> {
/**
* @property {string} openLock The unique lock name per connection that is used to acquire the
* lock for establishing a sender link by an entity on that connection.
* @readonly
*/
readonly openLock: string = `sender-${generate_uuid()}`;
/**
* @property {OnAmqpEvent} _onAmqpError The handler function to handle errors that happen on the
* underlying sender.
@ -109,51 +101,35 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
};
this._onAmqpClose = async (context: EventContext) => {
const sender = this.link || context.sender!;
const senderError = context.sender && context.sender.error;
log.error(
"[%s] 'sender_close' event occurred on the sender '%s' with address '%s'. ",
"The associated error is: %O",
this._context.connectionId,
this.name,
this.address,
`${this.logPrefix} 'sender_close' event occurred. The associated error is: %O`,
senderError
);
if (sender && !this.isConnecting) {
// Call onDetached to clean up timers & other resources
await this.onDetached().catch((err) => {
log.error(
"[%s] Error when closing sender [%s] after 'sender_close' event: %O",
this._context.connectionId,
this.name,
err
);
});
}
await this.onDetached().catch((err) => {
log.error(
`${this.logPrefix} error when closing sender after 'sender_close' event: %O`,
err
);
});
};
this._onSessionClose = async (context: EventContext) => {
const sender = this.link || context.sender!;
const sessionError = context.session && context.session.error;
log.error(
"[%s] 'session_close' event occurred on the session of sender '%s' with address '%s'. ",
"The associated error is: %O",
this._context.connectionId,
this.name,
this.address,
`${this.logPrefix} 'session_close' event occurred. The associated error is: %O`,
sessionError
);
if (sender && !this.isConnecting) {
// Call onDetached to clean up timers & other resources
await this.onDetached().catch((err) => {
log.error(
"[%s] Error when closing sender [%s] after 'session_close' event: %O",
this._context.connectionId,
this.name,
err
);
});
}
await this.onDetached().catch((err) => {
log.error(
`${this.logPrefix} error when closing sender after 'session_close' event: %O`,
err
);
});
};
}
@ -221,80 +197,85 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
return reject(err);
}
}
const timeTakenByInit = Date.now() - initStartTime;
log.sender(
"[%s] Sender '%s', credit: %d available: %d",
this._context.connectionId,
this.name,
this.link!.credit,
this.link!.session.outgoing.available()
);
if (!this.link!.sendable()) {
log.sender(
"[%s] Sender '%s', waiting for 1 second for sender to become sendable",
this._context.connectionId,
this.name
);
await delay(1000);
try {
const timeTakenByInit = Date.now() - initStartTime;
log.sender(
"[%s] Sender '%s' after waiting for a second, credit: %d available: %d",
"[%s] Sender '%s', credit: %d available: %d",
this._context.connectionId,
this.name,
this.link!.credit,
this.link!.session.outgoing.available()
this.link?.credit,
this.link?.session?.outgoing?.available()
);
}
if (this.link!.sendable()) {
if (timeoutInMs <= timeTakenByInit) {
const desc: string =
`[${this._context.connectionId}] Sender "${this.name}" ` +
`with address "${this.address}", was not able to send the message right now, due ` +
`to operation timeout.`;
log.error(desc);
const e: AmqpError = {
condition: ErrorNameConditionMapper.ServiceUnavailableError,
description: desc
};
return reject(translate(e));
}
try {
this.link!.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit) / 1000;
const delivery = await this.link!.send(
encodedMessage,
undefined,
sendBatch ? 0x80013700 : 0
);
if (!this.link?.sendable()) {
log.sender(
"[%s] Sender '%s', sent message with delivery id: %d",
"[%s] Sender '%s', waiting for 1 second for sender to become sendable",
this._context.connectionId,
this.name
);
await delay(1000);
log.sender(
"[%s] Sender '%s' after waiting for a second, credit: %d available: %d",
this._context.connectionId,
this.name,
delivery.id
this.link?.credit,
this.link?.session?.outgoing?.available()
);
return resolve();
} catch (error) {
error = translate(error.innerError || error);
log.error(
"[%s] An error occurred while sending the message",
this._context.connectionId,
error
);
return reject(error);
}
} else {
// let us retry to send the message after some time.
const msg =
`[${this._context.connectionId}] Sender "${this.name}", ` +
`cannot send the message right now. Please try later.`;
log.error(msg);
const amqpError: AmqpError = {
condition: ErrorNameConditionMapper.SenderBusyError,
description: msg
};
reject(translate(amqpError));
if (this.link?.sendable()) {
if (timeoutInMs <= timeTakenByInit) {
const desc: string =
`[${this._context.connectionId}] Sender "${this.name}" ` +
`with address "${this.address}", was not able to send the message right now, due ` +
`to operation timeout.`;
log.error(desc);
const e: AmqpError = {
condition: ErrorNameConditionMapper.ServiceUnavailableError,
description: desc
};
return reject(translate(e));
}
try {
this.link.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit) / 1000;
const delivery = await this.link!.send(
encodedMessage,
undefined,
sendBatch ? 0x80013700 : 0
);
log.sender(
"[%s] Sender '%s', sent message with delivery id: %d",
this._context.connectionId,
this.name,
delivery.id
);
return resolve();
} catch (error) {
error = translate(error.innerError || error);
log.error(
"[%s] An error occurred while sending the message",
this._context.connectionId,
error
);
return reject(error);
}
} else {
// let us retry to send the message after some time.
const msg =
`[${this._context.connectionId}] Sender "${this.name}", ` +
`cannot send the message right now. Please try later.`;
log.error(msg);
const amqpError: AmqpError = {
condition: ErrorNameConditionMapper.SenderBusyError,
description: msg
};
reject(translate(amqpError));
}
} catch (err) {
reject(err);
}
});
@ -322,43 +303,25 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
options?: AwaitableSenderOptions,
abortSignal?: AbortSignalLike
): Promise<void> {
const checkAborted = (): void => {
if (abortSignal?.aborted) {
throw new AbortError(StandardAbortMessage);
try {
if (!options) {
options = this._createSenderOptions(Constants.defaultOperationTimeoutInMs);
}
};
checkAborted();
if (this.isOpen()) {
return;
await this.initLink(options, abortSignal);
} catch (err) {
err = translate(err);
log.error(
"[%s] An error occurred while creating the sender %s",
this._context.connectionId,
this.name,
err
);
// Fix the unhelpful error messages for the OperationTimeoutError that comes from `rhea-promise`.
if ((err as MessagingError).code === "OperationTimeoutError") {
err.message = "Failed to create a sender within allocated time and retry attempts.";
}
throw err;
}
log.sender(
"Acquiring lock %s for initializing the session, sender and possibly the connection.",
this.openLock
);
return defaultLock.acquire(this.openLock, async () => {
try {
if (!options) {
options = this._createSenderOptions(Constants.defaultOperationTimeoutInMs);
}
await this.initLink(options, abortSignal);
} catch (err) {
err = translate(err);
log.error(
"[%s] An error occurred while creating the sender %s",
this._context.connectionId,
this.name,
err
);
// Fix the unhelpful error messages for the OperationTimeoutError that comes from `rhea-promise`.
if ((err as MessagingError).code === "OperationTimeoutError") {
err.message = "Failed to create a sender within allocated time and retry attempts.";
}
throw err;
}
});
}
/**
@ -377,7 +340,7 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
* @return {boolean} boolean
*/
isOpen(): boolean {
const result: boolean = this.link! && this.link!.isOpen();
const result: boolean = this.link == null ? false : this.link.isOpen();
log.error(
"[%s] Sender '%s' with address '%s' is open? -> %s",
this._context.connectionId,

Просмотреть файл

@ -121,38 +121,15 @@ export class StreamingReceiver extends MessageReceiver {
const connectionId = this._context.connectionId;
const receiverError = context.receiver && context.receiver.error;
const receiver = this.link || context.receiver!;
if (receiverError) {
log.error(
"[%s] 'receiver_close' event occurred for receiver '%s' with address '%s'. " +
"The associated error is: %O",
connectionId,
this.name,
this.address,
receiverError
);
}
log.error(
`${this.logPrefix} 'receiver_close' event occurred. The associated error is: %O`,
receiverError
);
this._clearAllMessageLockRenewTimers();
if (receiver && !receiver.isItselfClosed()) {
if (!this.isConnecting) {
log.error(
"[%s] 'receiver_close' event occurred on the receiver '%s' with address '%s' " +
"and the sdk did not initiate this. The receiver is not reconnecting. Hence, calling " +
"detached from the _onAmqpClose() handler.",
connectionId,
this.name,
this.address
);
await this.onDetached(receiverError);
} else {
log.error(
"[%s] 'receiver_close' event occurred on the receiver '%s' with address '%s' " +
"and the sdk did not initate this. Moreover the receiver is already re-connecting. " +
"Hence not calling detached from the _onAmqpClose() handler.",
connectionId,
this.name,
this.address
);
}
await this.onDetached(receiverError);
} else {
log.error(
"[%s] 'receiver_close' event occurred on the receiver '%s' with address '%s' " +
@ -169,38 +146,15 @@ export class StreamingReceiver extends MessageReceiver {
const connectionId = this._context.connectionId;
const receiver = this.link || context.receiver!;
const sessionError = context.session && context.session.error;
if (sessionError) {
log.error(
"[%s] 'session_close' event occurred for receiver '%s' with address '%s'. " +
"The associated error is: %O",
connectionId,
this.name,
this.address,
sessionError
);
}
log.error(
`${this.logPrefix} 'session_close' event occurred. The associated error is: %O`,
sessionError
);
this._clearAllMessageLockRenewTimers();
if (receiver && !receiver.isSessionItselfClosed()) {
if (!this.isConnecting) {
log.error(
"[%s] 'session_close' event occurred on the session of receiver '%s' with " +
"address '%s' and the sdk did not initiate this. Hence calling detached from the " +
"_onSessionClose() handler.",
connectionId,
this.name,
this.address
);
await this.onDetached(sessionError);
} else {
log.error(
"[%s] 'session_close' event occurred on the session of receiver '%s' with " +
"address '%s' and the sdk did not initiate this. Moreover the receiver is already " +
"re-connecting. Hence not calling detached from the _onSessionClose() handler.",
connectionId,
this.name,
this.address
);
}
await this.onDetached(sessionError);
} else {
log.error(
"[%s] 'session_close' event occurred on the session of receiver '%s' with address " +
@ -546,6 +500,8 @@ export class StreamingReceiver extends MessageReceiver {
* @returns {Promise<void>} Promise<void>.
*/
async onDetached(receiverError?: AmqpError | Error, causedByDisconnect?: boolean): Promise<void> {
log.error(`${this.logPrefix} Detaching.`);
const connectionId = this._context.connectionId;
// User explicitly called `close` on the receiver, so link is already closed

Просмотреть файл

@ -46,37 +46,13 @@ describe("controlled connection initialization", () => {
await serviceBusClient.test.after();
});
it("open() early exits if the connection is already open (avoid taking unnecessary lock)", async () => {
await sender.open();
// open uses a lock (at the sender level) that helps us not to have overlapping open() calls.
await defaultLock.acquire(sender["_sender"]["openLock"], async () => {
// the connection is _already_ open so it doesn't attempt to take a lock
// or actually try to open the connection (we have an early exit)
const secondOpenCallPromise = sender.open();
sender["_sender"]["_negotiateClaim"] = async () => {
// this is a decent way to tell if we tried to open the connection
throw new Error(
"We won't get here at all - the connection is already open so we'll early exit."
);
};
const ret = await Promise.race([delayThatReturns999(), secondOpenCallPromise]);
// ie, the Promise<void> from sender.open() 'won' because we don't
// acquire the lock when we early-exit.
assert.notExists(ret);
});
});
it("open() properly locks to prevent multiple in-flight open() calls", async () => {
// open uses a lock (at the sender level) that helps us not to have overlapping open() calls.
let secondOpenCallPromise: Promise<void> | undefined;
// acquire the same lock that open() uses and then, while it's 100% locked,
// attempt to call .open() and see that it just blocks...
await defaultLock.acquire(sender["_sender"]["openLock"], async () => {
await defaultLock.acquire(sender["_sender"]["_openLock"], async () => {
// we need to fake the connection being closed or else `open()` won't attempt to acquire
// the lock.
sender["_sender"]["isOpen"] = () => false;

Просмотреть файл

@ -17,6 +17,7 @@ import {
} from "../utils/abortSignalTestUtils";
import { createConnectionContextForTests } from "./unittestUtils";
import { StandardAbortMessage } from "../../src/util/utils";
import { isLinkLocked } from "../utils/misc";
describe("AbortSignal", () => {
const testMessageThatDoesntMatter = {
@ -165,7 +166,7 @@ describe("AbortSignal", () => {
assert.equal(err.name, "AbortError");
}
assert.isFalse(sender.isConnecting);
assert.isFalse(isLinkLocked(sender));
});
it("...afterLock", async () => {
@ -182,7 +183,7 @@ describe("AbortSignal", () => {
assert.equal(err.name, "AbortError");
}
assert.isFalse(sender.isConnecting);
assert.isFalse(isLinkLocked(sender));
});
it("...negotiateClaim", async () => {
@ -210,7 +211,7 @@ describe("AbortSignal", () => {
assert.equal(err.name, "AbortError");
}
assert.isFalse(sender.isConnecting);
assert.isFalse(isLinkLocked(sender));
});
it("...createAwaitableSender", async () => {
@ -238,7 +239,7 @@ describe("AbortSignal", () => {
assert.equal(err.name, "AbortError");
}
assert.isFalse(sender.isConnecting);
assert.isFalse(isLinkLocked(sender));
});
});
@ -260,7 +261,7 @@ describe("AbortSignal", () => {
assert.equal(err.name, "AbortError");
}
assert.isFalse(messageReceiver.isConnecting);
assert.isFalse(isLinkLocked(messageReceiver));
});
it("...after negotiateClaim", async () => {
@ -285,7 +286,7 @@ describe("AbortSignal", () => {
assert.equal(err.name, "AbortError");
}
assert.isFalse(messageReceiver["_isConnecting"]);
assert.isFalse(isLinkLocked(messageReceiver));
});
it("...after createReceiver", async () => {
@ -310,7 +311,7 @@ describe("AbortSignal", () => {
assert.equal(err.name, "AbortError");
}
assert.isFalse(messageReceiver.isConnecting);
assert.isFalse(isLinkLocked(messageReceiver));
});
});
});

Просмотреть файл

@ -2,11 +2,15 @@
// Licensed under the MIT license.
import { AbortSignalLike } from "@azure/abort-controller";
import { defaultLock } from "@azure/core-amqp";
import chai from "chai";
import chaiAsPromised from "chai-as-promised";
import { Receiver, ReceiverOptions } from "rhea-promise";
import sinon from "sinon";
import { ConnectionContext } from "../../src/connectionContext";
import { LinkEntity } from "../../src/core/linkEntity";
import * as log from "../../src/log";
import { isLinkLocked } from "../utils/misc";
import { createConnectionContextForTests, createRheaReceiverForTests } from "./unittestUtils";
chai.use(chaiAsPromised);
const assert = chai.assert;
@ -19,9 +23,11 @@ describe("LinkEntity unit tests", () => {
}
let linkEntity: LinkEntity<Receiver>;
let connectionContext: ConnectionContext;
beforeEach(() => {
linkEntity = new LinkForTests("some initial name", createConnectionContextForTests(), "sr", {
beforeEach(function() {
connectionContext = createConnectionContextForTests();
linkEntity = new LinkForTests("some initial name", connectionContext, "sr", {
address: "my-address"
});
});
@ -30,205 +36,285 @@ describe("LinkEntity unit tests", () => {
await linkEntity.close();
});
it("initLink - basic case", async () => {
assert.isFalse(linkEntity.isOpen(), "link isn't yet open, the class is just created.");
describe("initLink", () => {
it("basic case", async () => {
assert.isFalse(linkEntity.isOpen(), "link isn't yet open, the class is just created.");
await linkEntity.initLink({});
assert.match(linkEntity.name, /^some initial name-.*$/);
assertLinkEntityOpen();
// when we close with 'linkonly' it closes the link but the
// link can be reopened.
await linkEntity["closeLink"]();
assertLinkEntityClosedTemporarily();
await linkEntity.initLink({});
assertLinkEntityOpen();
await linkEntity.close();
assertLinkEntityClosedPermanently();
linkEntity.initLink({});
assertLinkEntityClosedPermanently();
});
it("initLink - multiple simultaneous initLink calls are ignored", async () => {
let timesCalled = 0;
linkEntity["_negotiateClaim"] = async () => {
++timesCalled;
// this will just resolve immediately because
// we're already connecting.
await linkEntity.initLink({});
};
await linkEntity.initLink({});
assert.match(linkEntity.name, /^some initial name-.*$/);
assertLinkEntityOpen();
assert.equal(
timesCalled,
1,
"Only one negotiateClaim call should make it through since the others were turned away because of the isConnecting field"
);
});
// when we close with 'linkonly' it closes the link but the
// link can be reopened.
await linkEntity["closeLink"]();
assertLinkEntityClosedTemporarily();
it("initLink - early exit when link is already open", async () => {
await linkEntity.initLink({});
let negotiateClaimCalled = 0;
linkEntity["_negotiateClaim"] = async () => {
++negotiateClaimCalled;
};
// connection already open
await linkEntity.initLink({});
assertLinkEntityOpen();
assert.equal(negotiateClaimCalled, 0, "If the link is already open we don't open another");
});
it("initLink - early exit when link has been permanently closed", async () => {
await linkEntity.initLink({});
assert.exists(linkEntity["_tokenRenewalTimer"], "the tokenrenewal timer should have been set");
let negotiateClaimCalled = 0;
linkEntity["_negotiateClaim"] = async () => {
++negotiateClaimCalled;
};
await linkEntity.close();
assertLinkEntityClosedPermanently();
await linkEntity.initLink({});
assert.isFalse(linkEntity.isOpen(), "Link was closed and will remain closed");
assert.equal(negotiateClaimCalled, 0, "We shouldn't attempt to reopen the link.");
});
it("initLink - error handling", async () => {
linkEntity["_negotiateClaim"] = async () => {
throw new Error("SPECIAL ERROR THROWN FROM NEGOTIATECLAIM");
};
try {
await linkEntity.initLink({});
assert.fail("Should have thrown");
} catch (err) {
assert.equal(err.message, "SPECIAL ERROR THROWN FROM NEGOTIATECLAIM");
}
});
assertLinkEntityOpen();
it("initLink - abortSignal - simple abort signal flow", async () => {
try {
await linkEntity.initLink({}, {
aborted: true
} as AbortSignalLike);
assert.fail("Should have thrown.");
} catch (err) {
assert.equal(err.name, "AbortError");
}
});
await linkEntity.close();
assertLinkEntityClosedPermanently();
it("initLink - abortSignal - if a link was actually created we clean up", async () => {
let isAborted = false;
const orig = linkEntity["createRheaLink"];
let returnedReceiver: Receiver | undefined;
linkEntity["createRheaLink"] = async (options) => {
isAborted = true;
returnedReceiver = await orig.call(linkEntity, options);
assert.isTrue(
returnedReceiver.isOpen(),
"Sanity check - the returnedReceiver was open when we returned it."
);
return returnedReceiver;
};
try {
await linkEntity.initLink({}, {
get aborted(): boolean {
return isAborted;
}
} as AbortSignalLike);
assert.fail("Should have thrown");
} catch (err) {
assert.equal(err.name, "AbortError");
}
// The returned receiver was closed since we aborted
// but it's not permanent since the user didn't initiate it.
assertLinkEntityClosedTemporarily();
// we can reinitialize an aborted link.
linkEntity["createRheaLink"] = orig;
await linkEntity.initLink({});
assert.isTrue(
linkEntity.isOpen(),
"Can always reopen if the reason we closed the link is because of the abortSignal"
);
});
it("initLink - can use a custom name via options", async () => {
assert.match(linkEntity.name, /some initial name-/);
await linkEntity.initLink({
name: "some new name"
try {
await linkEntity.initLink({});
assert.fail("Should have thrown");
} catch (err) {
assert.equal(err.message, "Link has been permanently closed. Not reopening.");
assert.equal(err.name, "AbortError");
}
assertLinkEntityClosedPermanently();
});
assert.equal(linkEntity["_logPrefix"], "[connection-id|sr:some new name|a:my-address]");
it("multiple simultaneous initLink calls obey the lock", async () => {
let timesCalled = 0;
// note that specifying a name is a complete override - no additional tacking
// on of a GUID or anything happens (that's up to you when you override the
// name)
assert.equal(
linkEntity.name,
"some new name",
"Name is an exact match to the name passed in the receiver options"
);
const innerPromises: Promise<void>[] = [];
// we also update the log prefix
assert.equal(linkEntity["_logPrefix"], "[connection-id|sr:some new name|a:my-address]");
});
linkEntity["_negotiateClaim"] = async () => {
++timesCalled;
// this will just resolve immediately because
// we're already connecting.
assert.isTrue(isLinkLocked(linkEntity));
};
await linkEntity.initLink({});
assert.equal(
timesCalled,
1,
"Only one negotiateClaim call should make it through since the others were turned away because of the isConnecting field"
);
await Promise.all(innerPromises);
});
describe("connection in a 'restarting' state causes initLink to throw", () => {
it("connection in restarting state before we called initLink", async () => {
connectionContext["isConnectionClosing"] = () => true;
try {
await linkEntity.initLink({});
assert.fail("Should have thrown");
} catch (err) {
assertInitAbortError(err);
}
});
it("connection is restarting just before cbsSession.init()", async () => {
try {
const old = linkEntity["checkIfConnectionReady"];
linkEntity["checkIfConnectionReady"] = () => {
if (defaultLock.isBusy(linkEntity["_context"]["cbsSession"]["cbsLock"])) {
connectionContext["isConnectionClosing"] = () => true;
}
old.apply(linkEntity);
};
await linkEntity.initLink({});
assert.fail("Should have thrown");
} catch (err) {
assertInitAbortError(err);
}
});
it("connection is restarting just before cbsSession.negotiateClaim()", async () => {
try {
const old = linkEntity["checkIfConnectionReady"];
linkEntity["checkIfConnectionReady"] = () => {
if (defaultLock.isBusy(linkEntity["_context"]["negotiateClaimLock"])) {
connectionContext["isConnectionClosing"] = () => true;
}
old.apply(linkEntity);
};
await linkEntity.initLink({});
assert.fail("Should have thrown");
} catch (err) {
assertInitAbortError(err);
}
});
it("connection is restarting just before createRheaLink()", async () => {
try {
const old = linkEntity["_negotiateClaim"];
linkEntity["_negotiateClaim"] = async () => {
await old.apply(linkEntity);
connectionContext["isConnectionClosing"] = () => true;
};
await linkEntity.initLink({});
assert.fail("Should have thrown");
} catch (err) {
assertInitAbortError(err);
}
});
function assertInitAbortError(err: any) {
assert.equal(err.message, "Connection is reopening, aborting link initialization.");
assert.equal(err.name, "MessagingError");
assert.isTrue(
err.retryable,
"Exception thrown when the connection is closing should be retryable"
);
}
});
it("connection is not ready causes initLink to block", async () => {
connectionContext["readyToOpenLink"] = async () => {
// should be safe for readyToOpenLink to close the link object
// without this it's possible for us to deadlock.
await linkEntity["closeLink"]();
};
assert.isFalse(linkEntity.isOpen());
await linkEntity.initLink({});
assert.isTrue(linkEntity.isOpen());
});
it("early exit when link is already open", async () => {
await linkEntity.initLink({});
let negotiateClaimCalled = 0;
linkEntity["_negotiateClaim"] = async () => {
++negotiateClaimCalled;
};
// connection already open
await linkEntity.initLink({});
assertLinkEntityOpen();
assert.equal(negotiateClaimCalled, 0, "If the link is already open we don't open another");
});
it("early exit when link has been permanently closed", async () => {
await linkEntity.initLink({});
assert.exists(
linkEntity["_tokenRenewalTimer"],
"the tokenrenewal timer should have been set"
);
const negotiateClaimSpy = sinon.spy(linkEntity as any, "_negotiateClaim");
it("initLink - user closes link while it's initializing", async () => {
linkEntity["createRheaLink"] = async () => {
await linkEntity.close();
return createRheaReceiverForTests();
};
assertLinkEntityClosedPermanently();
try {
await linkEntity.initLink({});
assert.fail("Should throw");
} catch (err) {
assert.equal("Link has been permanently closed. Not reopening.", err.message);
assert.isFalse(linkEntity.isOpen(), "Link was closed and will remain closed");
assert.isFalse(negotiateClaimSpy.called, "We shouldn't attempt to reopen the link.");
}
});
it("error handling", async () => {
linkEntity["_negotiateClaim"] = async () => {
throw new Error("SPECIAL ERROR THROWN FROM NEGOTIATECLAIM");
};
try {
await linkEntity.initLink({});
assert.fail("Should have thrown");
} catch (err) {
assert.equal(err.message, "SPECIAL ERROR THROWN FROM NEGOTIATECLAIM");
}
});
it("abortSignal - simple abort signal flow", async () => {
try {
await linkEntity.initLink({}, {
aborted: true
} as AbortSignalLike);
assert.fail("Should have thrown.");
} catch (err) {
assert.equal(err.name, "AbortError");
}
});
it("abortSignal - if a link was actually created we clean up", async () => {
let isAborted = false;
const orig = linkEntity["createRheaLink"];
let returnedReceiver: Receiver | undefined;
linkEntity["createRheaLink"] = async (options) => {
isAborted = true;
returnedReceiver = await orig.call(linkEntity, options);
assert.isTrue(
returnedReceiver.isOpen(),
"Sanity check - the returnedReceiver was open when we returned it."
);
return returnedReceiver;
};
try {
await linkEntity.initLink({}, {
get aborted(): boolean {
return isAborted;
}
} as AbortSignalLike);
assert.fail("Should have thrown");
} catch (err) {
assert.equal(err.name, "AbortError");
}
// The returned receiver was closed since we aborted
// but it's not permanent since the user didn't initiate it.
assertLinkEntityClosedTemporarily();
// we can reinitialize an aborted link.
linkEntity["createRheaLink"] = orig;
await linkEntity.initLink({});
assert.isTrue(
linkEntity.isOpen(),
"Can always reopen if the reason we closed the link is because of the abortSignal"
);
});
it("can use a custom name via options", async () => {
assert.match(linkEntity.name, /some initial name-/);
try {
await linkEntity.initLink({
name: "some new name"
});
assert.fail("Should have thrown");
} catch (err) {
assert.equal(err.name, "AbortError");
}
assert.equal(linkEntity["_logPrefix"], "[connection-id|sr:some new name|a:my-address]");
assertLinkEntityClosedPermanently();
});
assert.equal(linkEntity["_logPrefix"], "[connection-id|sr:some new name]");
it("initLink - multiple closes don't cause errors", async () => {
// TODO: there is a possibility of a race condition here. We can address this
// when we properly lock around init operations that are in progress.
await linkEntity["closeLink"]();
await linkEntity["closeLink"]();
// note that specifying a name is a complete override - no additional tacking
// on of a GUID or anything happens (that's up to you when you override the
// name)
assert.equal(
linkEntity.name,
"some new name",
"Name is an exact match to the name passed in the receiver options"
);
await linkEntity.close();
await linkEntity.close();
});
// we also update the log prefix
assert.equal(linkEntity["_logPrefix"], "[connection-id|sr:some new name]");
});
it("initLink - get logger", async () => {
assert.strictEqual(LinkEntity["getLogger"]("br"), log.batching);
assert.strictEqual(LinkEntity["getLogger"]("sr"), log.streaming);
assert.strictEqual(LinkEntity["getLogger"]("s"), log.sender);
assert.strictEqual(LinkEntity["getLogger"]("m"), log.mgmt);
assert.strictEqual(LinkEntity["getLogger"]("ms"), log.messageSession);
it("multiple closes don't cause errors", async () => {
// TODO: there is a possibility of a race condition here. We can address this
// when we properly lock around init operations that are in progress.
await linkEntity["closeLink"]();
await linkEntity["closeLink"]();
await linkEntity.close();
await linkEntity.close();
});
it("get logger", async () => {
assert.strictEqual(LinkEntity["getLogger"]("br"), log.batching);
assert.strictEqual(LinkEntity["getLogger"]("sr"), log.streaming);
assert.strictEqual(LinkEntity["getLogger"]("s"), log.sender);
assert.strictEqual(LinkEntity["getLogger"]("m"), log.mgmt);
assert.strictEqual(LinkEntity["getLogger"]("ms"), log.messageSession);
});
});
function assertLinkEntityOpen(): void {

Просмотреть файл

@ -3,12 +3,11 @@
import chai from "chai";
import chaiAsPromised from "chai-as-promised";
import { ReceiverEvents, ReceiverOptions } from "rhea-promise";
import { Receiver, ReceiverEvents, ReceiverOptions } from "rhea-promise";
import { ReceivedMessage, ReceivedMessageWithLock } from "../../src";
chai.use(chaiAsPromised);
const assert = chai.assert;
import { ConnectionContext } from "../../src/connectionContext";
import { BatchingReceiver } from "../../src/core/batchingReceiver";
import { StreamingReceiver } from "../../src/core/streamingReceiver";
import { ServiceBusReceiverImpl } from "../../src/receivers/receiver";
@ -21,18 +20,11 @@ import { Constants } from "@azure/core-amqp";
describe("Receiver unit tests", () => {
describe("init() and close() interactions", () => {
function fakeContext(): ConnectionContext {
return ({
config: {},
connection: {
id: "connection-id"
},
messageReceivers: {}
} as unknown) as ConnectionContext;
}
it("close() called just after init() but before the next step", async () => {
const batchingReceiver = new BatchingReceiver(fakeContext(), "fakeEntityPath");
const batchingReceiver = new BatchingReceiver(
createConnectionContextForTests(),
"fakeEntityPath"
);
let initWasCalled = false;
batchingReceiver["_init"] = async () => {
@ -50,7 +42,10 @@ describe("Receiver unit tests", () => {
});
it("message receiver init() bails out early if object is closed()", async () => {
const messageReceiver2 = new StreamingReceiver(fakeContext(), "fakeEntityPath");
const messageReceiver2 = new StreamingReceiver(
createConnectionContextForTests(),
"fakeEntityPath"
);
await messageReceiver2.close();
@ -66,27 +61,30 @@ describe("Receiver unit tests", () => {
);
};
await messageReceiver2["_init"]({} as ReceiverOptions);
assert.isFalse(negotiateClaimWasCalled);
try {
await messageReceiver2["_init"]({} as ReceiverOptions);
assert.fail("Should throw");
} catch (err) {
assert.equal("Link has been permanently closed. Not reopening.", err.message);
assert.equal(err.name, "AbortError");
assert.isFalse(negotiateClaimWasCalled);
}
});
});
describe("subscribe()", () => {
it("subscribe and subscription.close()", async () => {
let receiverWasDrained = false;
let closeWasCalled = false;
let createdRheaReceiver: Receiver | undefined;
const receiverImpl = new ServiceBusReceiverImpl<any>(
createConnectionContextForTests({
onCreateReceiverCalled: (receiver) => {
createdRheaReceiver = receiver;
receiver.addListener(ReceiverEvents.receiverDrained, () => {
receiverWasDrained = true;
});
(receiver as any).close = () => {
closeWasCalled = true;
};
}
}),
"fakeEntityPath",
@ -101,7 +99,7 @@ describe("Receiver unit tests", () => {
// closing a subscription doesn't close out the receiver created for it.
// this allows the user a chance to resolve any outstanding messages.
assert.isFalse(
closeWasCalled,
createdRheaReceiver?.isClosed(),
"sanity check, subscription.close() does not close the receiver"
);
assert.isTrue(
@ -111,7 +109,7 @@ describe("Receiver unit tests", () => {
await receiverImpl.close();
// rhea receiver is finally closed when the overall Receiver class is closed.
assert.isTrue(closeWasCalled, "receiver should note that we closed");
assert.isTrue(createdRheaReceiver?.isClosed(), "receiver should note that we closed");
});
it("can't subscribe while another subscribe is active", async () => {

Просмотреть файл

@ -6,7 +6,7 @@ import { ServiceBusMessageBatchImpl } from "../../src/serviceBusMessageBatch";
import { ConnectionContext } from "../../src/connectionContext";
import { ServiceBusMessage } from "../../src";
import { isServiceBusMessageBatch, ServiceBusSenderImpl } from "../../src/sender";
import { DefaultDataTransformer } from "@azure/core-amqp";
import { createConnectionContextForTests } from "./unittestUtils";
const assert = chai.assert;
@ -88,24 +88,3 @@ describe("sender unit tests", () => {
});
});
});
function createConnectionContextForTests(): ConnectionContext & { initWasCalled: boolean } {
let initWasCalled = false;
const fakeConnectionContext = {
config: { endpoint: "my.service.bus" },
connectionId: "connection-id",
dataTransformer: new DefaultDataTransformer(),
cbsSession: {
cbsLock: "cbs-lock",
async init() {
initWasCalled = true;
}
},
senders: {},
managementClients: {},
initWasCalled
};
return (fakeConnectionContext as any) as ReturnType<typeof createConnectionContextForTests>;
}

Просмотреть файл

@ -11,6 +11,7 @@ import {
import { DefaultDataTransformer, AccessToken } from "@azure/core-amqp";
import { EventEmitter } from "events";
import { getUniqueName } from "../../src/util/utils";
import { Link } from "rhea-promise/typings/lib/link";
/**
* Creates a fake ConnectionContext for tests that can create semi-realistic
@ -26,11 +27,16 @@ import { getUniqueName } from "../../src/util/utils";
export function createConnectionContextForTests(options?: {
onCreateAwaitableSenderCalled?: () => void;
onCreateReceiverCalled?: (receiver: RheaReceiver) => void;
}): ConnectionContext & { initWasCalled: boolean } {
}): ConnectionContext & {
initWasCalled: boolean;
} {
let initWasCalled = false;
const fakeConnectionContext = {
async readyToOpenLink(): Promise<void> {},
isConnectionClosing(): boolean {
return false;
},
messageReceivers: {},
senders: {},
messageSessions: {},
@ -49,10 +55,11 @@ export function createConnectionContextForTests(options?: {
}
const testAwaitableSender = ({
setMaxListeners: () => testAwaitableSender,
close: async () => {}
setMaxListeners: () => testAwaitableSender
} as any) as AwaitableSender;
mockLinkProperties(testAwaitableSender);
return testAwaitableSender;
},
createReceiver: async (): Promise<RheaReceiver> => {
@ -62,6 +69,8 @@ export function createConnectionContextForTests(options?: {
options.onCreateReceiverCalled(receiver);
}
mockLinkProperties(receiver);
(receiver as any).connection = { id: "connection-id" };
return receiver;
}
@ -104,10 +113,8 @@ export function createRheaReceiverForTests(options?: ReceiverOptions) {
id: "connection-id"
};
let isOpen = true;
(receiver as any).addCredit = (credit: number) => {
if (!isOpen) {
if (!receiver.isOpen()) {
throw new Error("TEST INCONSISTENCY: trying to .addCredit() to a closed receiver");
}
@ -123,12 +130,20 @@ export function createRheaReceiverForTests(options?: ReceiverOptions) {
}
};
(receiver as any).close = async (): Promise<void> => {
mockLinkProperties(receiver);
return receiver;
}
export function mockLinkProperties(link: Link): void {
let isOpen = true;
link.close = async (): Promise<void> => {
isOpen = false;
};
(receiver as any).isOpen = () => isOpen;
return receiver;
link.isItselfClosed = () => !isOpen;
link.isOpen = () => isOpen;
link.isClosed = () => !isOpen;
}
export function getPromiseResolverForTest(): {
@ -170,3 +185,9 @@ export function defer<T>(): {
reject: actualReject!
};
}
export const retryableErrorForTests = (() => {
const err = new Error("a retryable error");
(err as any).retryable = true;
return err;
})();

Просмотреть файл

@ -269,7 +269,8 @@ describe("session tests", () => {
await receiver.getSessionState({ abortSignal: controller.signal });
throw new Error(`Test failure`);
} catch (err) {
err.message.should.equal("The getState operation has been cancelled by the user.");
err.message.should.equal("The operation was aborted.");
err.name.should.equal("AbortError");
}
});
@ -281,7 +282,8 @@ describe("session tests", () => {
await receiver.setSessionState("why", { abortSignal: controller.signal });
throw new Error(`Test failure`);
} catch (err) {
err.message.should.equal("The setState operation has been cancelled by the user.");
err.message.should.equal("The operation was aborted.");
err.name.should.equal("AbortError");
}
});
@ -293,7 +295,8 @@ describe("session tests", () => {
await receiver.renewSessionLock({ abortSignal: controller.signal });
throw new Error(`Test failure`);
} catch (err) {
err.message.should.equal("The renewSessionLock operation has been cancelled by the user.");
err.message.should.equal("The operation was aborted.");
err.name.should.equal("AbortError");
}
});
@ -307,9 +310,8 @@ describe("session tests", () => {
await receiver.receiveDeferredMessages([Long.ZERO], { abortSignal: controller.signal });
throw new Error(`Test failure`);
} catch (err) {
err.message.should.equal(
"The receiveDeferredMessages operation has been cancelled by the user."
);
err.message.should.equal("The operation was aborted.");
err.name.should.equal("AbortError");
}
});
});

Просмотреть файл

@ -1,7 +1,9 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { defaultLock } from "@azure/core-amqp";
import { Delivery, ReceivedMessage } from "../../src";
import { LinkEntity } from "../../src/core/linkEntity";
import { ServiceBusMessageImpl } from "../../src/serviceBusMessage";
// some functions useful as we transition between interfaces and classes.
@ -19,3 +21,7 @@ export function getDeliveryProperty(message: ReceivedMessage): Delivery {
"Received message does not contain a .delivery member - not a ServiceBusMessageImpl instance."
);
}
export function isLinkLocked(linkEntity: LinkEntity<any>): boolean {
return defaultLock.isBusy(linkEntity["_openLock"]);
}

Просмотреть файл

@ -509,7 +509,7 @@ export async function drainReceiveAndDeleteReceiver(
function connectionString() {
if (env[EnvVarNames.SERVICEBUS_CONNECTION_STRING] == null) {
throw new Error(
`No service bus connection string defined in ${EnvVarNames.SERVICEBUS_CONNECTION_STRING}`
`No service bus connection string defined in ${EnvVarNames.SERVICEBUS_CONNECTION_STRING}. If you're in a unit test you should not be depending on the deployed environment!`
);
}