[service-bus] Retries implemented for settling messages on the receiver link. (#14867)

This was the source of a few live test pipeline failures - it was not resilient against network failures.

RequestResponseLink retries will come in a separate PR as that's a bit more involved.

(also, renamed receiver/shared.ts to receiver/receivercommon.ts, just to reduce file naming confusion amongst multiple shared.ts files)

Partly related to #13796
This commit is contained in:
Richard Park 2021-04-14 15:24:28 -07:00 коммит произвёл GitHub
Родитель cf85b3e784
Коммит 4dac947d84
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
16 изменённых файлов: 285 добавлений и 51 удалений

Просмотреть файл

@ -512,6 +512,8 @@ export enum RetryOperationType {
// (undocumented) // (undocumented)
management = "management", management = "management",
// (undocumented) // (undocumented)
messageSettlement = "settlement",
// (undocumented)
receiveMessage = "receiveMessage", receiveMessage = "receiveMessage",
// (undocumented) // (undocumented)
receiverLink = "receiverLink", receiverLink = "receiverLink",

Просмотреть файл

@ -46,7 +46,8 @@ export enum RetryOperationType {
senderLink = "senderLink", senderLink = "senderLink",
sendMessage = "sendMessage", sendMessage = "sendMessage",
receiveMessage = "receiveMessage", receiveMessage = "receiveMessage",
session = "session" session = "session",
messageSettlement = "settlement"
} }
/** /**

Просмотреть файл

@ -1,6 +1,6 @@
# Release History # Release History
## 7.1.0-beta.1 (2021-04-07) ## 7.1.0-beta.1 (Unreleased)
### New Features ### New Features
@ -12,6 +12,8 @@
### Bug fixes ### Bug fixes
- Some of the queue properties such as "forwardTo" and "autoDeleteOnIdle" were not being set as requested through the `ServiceBusAdministrationClient.createQueue` method because of a bug w.r.t the ordering of XML properties. The issue has been fixed in [#14692](https://github.com/Azure/azure-sdk-for-js/pull/14692). - Some of the queue properties such as "forwardTo" and "autoDeleteOnIdle" were not being set as requested through the `ServiceBusAdministrationClient.createQueue` method because of a bug w.r.t the ordering of XML properties. The issue has been fixed in [#14692](https://github.com/Azure/azure-sdk-for-js/pull/14692).
- Settling messages now use the `retryOptions` passed to `ServiceBusClient`, making it more resilient against network failures.
[PR#14867](https://github.com/Azure/azure-sdk-for-js/pull/14867/files)
## 7.0.4 (2021-03-31) ## 7.0.4 (2021-03-31)

Просмотреть файл

@ -19,7 +19,8 @@ import {
Constants, Constants,
MessagingError, MessagingError,
RequestResponseLink, RequestResponseLink,
SendRequestOptions SendRequestOptions,
RetryOptions
} from "@azure/core-amqp"; } from "@azure/core-amqp";
import { ConnectionContext } from "../connectionContext"; import { ConnectionContext } from "../connectionContext";
import { import {
@ -168,6 +169,10 @@ export interface DispositionStatusOptions extends OperationOptionsBase {
* This should only be provided if `session` is enabled for a Queue or Topic. * This should only be provided if `session` is enabled for a Queue or Topic.
*/ */
sessionId?: string; sessionId?: string;
/**
* Retry options.
*/
retryOptions: RetryOptions | undefined;
} }
/** /**
@ -831,7 +836,8 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
async updateDispositionStatus( async updateDispositionStatus(
lockToken: string, lockToken: string,
dispositionType: DispositionType, dispositionType: DispositionType,
options?: DispositionStatusOptions & SendManagementRequestOptions // TODO: mgmt link retry<> will come in the next PR.
options?: Omit<DispositionStatusOptions, "retryOptions"> & SendManagementRequestOptions
): Promise<void> { ): Promise<void> {
throwErrorIfConnectionClosed(this._context); throwErrorIfConnectionClosed(this._context);
if (!options) options = {}; if (!options) options = {};

Просмотреть файл

@ -241,10 +241,9 @@ export abstract class MessageReceiver extends LinkEntity<Receiver> {
async settleMessage( async settleMessage(
message: ServiceBusMessageImpl, message: ServiceBusMessageImpl,
operation: DispositionType, operation: DispositionType,
options?: DispositionStatusOptions options: DispositionStatusOptions
): Promise<any> { ): Promise<any> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (!options) options = {};
if (operation.match(/^(complete|abandon|defer|deadletter)$/) == null) { if (operation.match(/^(complete|abandon|defer|deadletter)$/) == null) {
return reject(new Error(`operation: '${operation}' is not a valid operation.`)); return reject(new Error(`operation: '${operation}' is not a valid operation.`));
} }
@ -268,7 +267,7 @@ export abstract class MessageReceiver extends LinkEntity<Receiver> {
"message may or may not be successful" "message may or may not be successful"
}; };
return reject(translateServiceBusError(e)); return reject(translateServiceBusError(e));
}, Constants.defaultOperationTimeoutInMs); }, options.retryOptions?.timeoutInMs ?? Constants.defaultOperationTimeoutInMs);
this._deliveryDispositionMap.set(delivery.id, { this._deliveryDispositionMap.set(delivery.id, {
resolve: resolve, resolve: resolve,
reject: reject, reject: reject,

Просмотреть файл

@ -27,7 +27,7 @@ import { AmqpError, EventContext, OnAmqpEvent } from "rhea-promise";
import { ServiceBusMessageImpl } from "../serviceBusMessage"; import { ServiceBusMessageImpl } from "../serviceBusMessage";
import { AbortSignalLike } from "@azure/abort-controller"; import { AbortSignalLike } from "@azure/abort-controller";
import { translateServiceBusError } from "../serviceBusError"; import { translateServiceBusError } from "../serviceBusError";
import { abandonMessage, completeMessage } from "../receivers/shared"; import { abandonMessage, completeMessage } from "../receivers/receiverCommon";
import { ReceiverHandlers } from "./shared"; import { ReceiverHandlers } from "./shared";
/** /**
@ -273,7 +273,13 @@ export class StreamingReceiver extends MessageReceiver {
this.name, this.name,
error error
); );
await abandonMessage(bMessage, this._context, entityPath); await abandonMessage(
bMessage,
this._context,
entityPath,
undefined,
this._retryOptions
);
} catch (abandonError) { } catch (abandonError) {
const translatedError = translateServiceBusError(abandonError); const translatedError = translateServiceBusError(abandonError);
logger.logError( logger.logError(
@ -310,7 +316,7 @@ export class StreamingReceiver extends MessageReceiver {
this.logPrefix, this.logPrefix,
bMessage.messageId bMessage.messageId
); );
await completeMessage(bMessage, this._context, entityPath); await completeMessage(bMessage, this._context, entityPath, this._retryOptions);
} catch (completeError) { } catch (completeError) {
const translatedError = translateServiceBusError(completeError); const translatedError = translateServiceBusError(completeError);
logger.logError( logger.logError(

Просмотреть файл

@ -33,7 +33,7 @@ import {
deferMessage, deferMessage,
getMessageIterator, getMessageIterator,
wrapProcessErrorHandler wrapProcessErrorHandler
} from "./shared"; } from "./receiverCommon";
import Long from "long"; import Long from "long";
import { ServiceBusMessageImpl, DeadLetterOptions } from "../serviceBusMessage"; import { ServiceBusMessageImpl, DeadLetterOptions } from "../serviceBusMessage";
import { Constants, RetryConfig, RetryOperationType, RetryOptions, retry } from "@azure/core-amqp"; import { Constants, RetryConfig, RetryOperationType, RetryOptions, retry } from "@azure/core-amqp";
@ -634,7 +634,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
this._throwIfReceiverOrConnectionClosed(); this._throwIfReceiverOrConnectionClosed();
throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId); throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId);
const msgImpl = message as ServiceBusMessageImpl; const msgImpl = message as ServiceBusMessageImpl;
return completeMessage(msgImpl, this._context, this.entityPath); return completeMessage(msgImpl, this._context, this.entityPath, this._retryOptions);
} }
async abandonMessage( async abandonMessage(
@ -644,7 +644,13 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
this._throwIfReceiverOrConnectionClosed(); this._throwIfReceiverOrConnectionClosed();
throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId); throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId);
const msgImpl = message as ServiceBusMessageImpl; const msgImpl = message as ServiceBusMessageImpl;
return abandonMessage(msgImpl, this._context, this.entityPath, propertiesToModify); return abandonMessage(
msgImpl,
this._context,
this.entityPath,
propertiesToModify,
this._retryOptions
);
} }
async deferMessage( async deferMessage(
@ -654,7 +660,13 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
this._throwIfReceiverOrConnectionClosed(); this._throwIfReceiverOrConnectionClosed();
throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId); throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId);
const msgImpl = message as ServiceBusMessageImpl; const msgImpl = message as ServiceBusMessageImpl;
return deferMessage(msgImpl, this._context, this.entityPath, propertiesToModify); return deferMessage(
msgImpl,
this._context,
this.entityPath,
propertiesToModify,
this._retryOptions
);
} }
async deadLetterMessage( async deadLetterMessage(
@ -664,7 +676,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
this._throwIfReceiverOrConnectionClosed(); this._throwIfReceiverOrConnectionClosed();
throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId); throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId);
const msgImpl = message as ServiceBusMessageImpl; const msgImpl = message as ServiceBusMessageImpl;
return deadLetterMessage(msgImpl, this._context, this.entityPath, options); return deadLetterMessage(msgImpl, this._context, this.entityPath, options, this._retryOptions);
} }
async renewMessageLock(message: ServiceBusReceivedMessage): Promise<Date> { async renewMessageLock(message: ServiceBusReceivedMessage): Promise<Date> {

Просмотреть файл

@ -14,7 +14,12 @@ import {
} from "../serviceBusMessage"; } from "../serviceBusMessage";
import { DispositionStatusOptions } from "../core/managementClient"; import { DispositionStatusOptions } from "../core/managementClient";
import { ConnectionContext } from "../connectionContext"; import { ConnectionContext } from "../connectionContext";
import { ErrorNameConditionMapper } from "@azure/core-amqp"; import {
ErrorNameConditionMapper,
retry,
RetryOperationType,
RetryOptions
} from "@azure/core-amqp";
import { MessageAlreadySettled } from "../util/errors"; import { MessageAlreadySettled } from "../util/errors";
import { isDefined } from "../util/typeGuards"; import { isDefined } from "../util/typeGuards";
@ -78,14 +83,17 @@ export function wrapProcessErrorHandler(
export function completeMessage( export function completeMessage(
message: ServiceBusMessageImpl, message: ServiceBusMessageImpl,
context: ConnectionContext, context: ConnectionContext,
entityPath: string entityPath: string,
retryOptions: RetryOptions | undefined
): Promise<void> { ): Promise<void> {
receiverLogger.verbose( receiverLogger.verbose(
"[%s] Completing the message with id '%s'.", "[%s] Completing the message with id '%s'.",
context.connectionId, context.connectionId,
message.messageId message.messageId
); );
return settleMessage(message, DispositionType.complete, context, entityPath); return settleMessage(message, DispositionType.complete, context, entityPath, {
retryOptions
});
} }
/** /**
@ -96,7 +104,8 @@ export function abandonMessage(
message: ServiceBusMessageImpl, message: ServiceBusMessageImpl,
context: ConnectionContext, context: ConnectionContext,
entityPath: string, entityPath: string,
propertiesToModify?: { [key: string]: any } propertiesToModify: { [key: string]: any } | undefined,
retryOptions: RetryOptions | undefined
): Promise<void> { ): Promise<void> {
receiverLogger.verbose( receiverLogger.verbose(
"[%s] Abandoning the message with id '%s'.", "[%s] Abandoning the message with id '%s'.",
@ -104,7 +113,8 @@ export function abandonMessage(
message.messageId message.messageId
); );
return settleMessage(message, DispositionType.abandon, context, entityPath, { return settleMessage(message, DispositionType.abandon, context, entityPath, {
propertiesToModify propertiesToModify,
retryOptions
}); });
} }
@ -116,7 +126,8 @@ export function deferMessage(
message: ServiceBusMessageImpl, message: ServiceBusMessageImpl,
context: ConnectionContext, context: ConnectionContext,
entityPath: string, entityPath: string,
propertiesToModify?: { [key: string]: any } propertiesToModify: { [key: string]: any } | undefined,
retryOptions: RetryOptions | undefined
): Promise<void> { ): Promise<void> {
receiverLogger.verbose( receiverLogger.verbose(
"[%s] Deferring the message with id '%s'.", "[%s] Deferring the message with id '%s'.",
@ -124,6 +135,7 @@ export function deferMessage(
message.messageId message.messageId
); );
return settleMessage(message, DispositionType.defer, context, entityPath, { return settleMessage(message, DispositionType.defer, context, entityPath, {
retryOptions,
propertiesToModify propertiesToModify
}); });
} }
@ -136,7 +148,8 @@ export function deadLetterMessage(
message: ServiceBusMessageImpl, message: ServiceBusMessageImpl,
context: ConnectionContext, context: ConnectionContext,
entityPath: string, entityPath: string,
propertiesToModify?: DeadLetterOptions & { [key: string]: any } propertiesToModify: (DeadLetterOptions & { [key: string]: any }) | undefined,
retryOptions: RetryOptions | undefined
): Promise<void> { ): Promise<void> {
receiverLogger.verbose( receiverLogger.verbose(
"[%s] Deadlettering the message with id '%s'.", "[%s] Deadlettering the message with id '%s'.",
@ -155,7 +168,8 @@ export function deadLetterMessage(
const dispositionStatusOptions: DispositionStatusOptions = { const dispositionStatusOptions: DispositionStatusOptions = {
propertiesToModify: actualPropertiesToModify, propertiesToModify: actualPropertiesToModify,
deadLetterReason: propertiesToModify?.deadLetterReason, deadLetterReason: propertiesToModify?.deadLetterReason,
deadLetterDescription: propertiesToModify?.deadLetterErrorDescription deadLetterDescription: propertiesToModify?.deadLetterErrorDescription,
retryOptions
}; };
return settleMessage( return settleMessage(
@ -169,14 +183,35 @@ export function deadLetterMessage(
/** /**
* @internal * @internal
*
*/ */
function settleMessage( export function settleMessage(
message: ServiceBusMessageImpl, message: ServiceBusMessageImpl,
operation: DispositionType, operation: DispositionType,
context: ConnectionContext, context: ConnectionContext,
entityPath: string, entityPath: string,
options?: DispositionStatusOptions options: DispositionStatusOptions,
_settleMessageOperation: typeof settleMessageOperation = settleMessageOperation
): Promise<void> {
return retry({
connectionId: context.connectionId,
operation: () => {
return _settleMessageOperation(message, operation, context, entityPath, options);
},
operationType: RetryOperationType.messageSettlement,
abortSignal: options?.abortSignal,
retryOptions: options?.retryOptions
});
}
/**
* @internal
*/
export function settleMessageOperation(
message: ServiceBusMessageImpl,
operation: DispositionType,
context: ConnectionContext,
entityPath: string,
options: DispositionStatusOptions
): Promise<void> { ): Promise<void> {
const isDeferredMessage = !message.delivery.link; const isDeferredMessage = !message.delivery.link;
const receiver = isDeferredMessage const receiver = isDeferredMessage

Просмотреть файл

@ -24,7 +24,7 @@ import {
deferMessage, deferMessage,
getMessageIterator, getMessageIterator,
wrapProcessErrorHandler wrapProcessErrorHandler
} from "./shared"; } from "./receiverCommon";
import { defaultMaxTimeAfterFirstMessageForBatchingMs, ServiceBusReceiver } from "./receiver"; import { defaultMaxTimeAfterFirstMessageForBatchingMs, ServiceBusReceiver } from "./receiver";
import Long from "long"; import Long from "long";
import { ServiceBusMessageImpl, DeadLetterOptions } from "../serviceBusMessage"; import { ServiceBusMessageImpl, DeadLetterOptions } from "../serviceBusMessage";
@ -503,7 +503,7 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver
this._throwIfReceiverOrConnectionClosed(); this._throwIfReceiverOrConnectionClosed();
throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId); throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId);
const msgImpl = message as ServiceBusMessageImpl; const msgImpl = message as ServiceBusMessageImpl;
return completeMessage(msgImpl, this._context, this.entityPath); return completeMessage(msgImpl, this._context, this.entityPath, this._retryOptions);
} }
async abandonMessage( async abandonMessage(
@ -513,7 +513,13 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver
this._throwIfReceiverOrConnectionClosed(); this._throwIfReceiverOrConnectionClosed();
throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId); throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId);
const msgImpl = message as ServiceBusMessageImpl; const msgImpl = message as ServiceBusMessageImpl;
return abandonMessage(msgImpl, this._context, this.entityPath, propertiesToModify); return abandonMessage(
msgImpl,
this._context,
this.entityPath,
propertiesToModify,
this._retryOptions
);
} }
async deferMessage( async deferMessage(
@ -523,7 +529,13 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver
this._throwIfReceiverOrConnectionClosed(); this._throwIfReceiverOrConnectionClosed();
throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId); throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId);
const msgImpl = message as ServiceBusMessageImpl; const msgImpl = message as ServiceBusMessageImpl;
return deferMessage(msgImpl, this._context, this.entityPath, propertiesToModify); return deferMessage(
msgImpl,
this._context,
this.entityPath,
propertiesToModify,
this._retryOptions
);
} }
async deadLetterMessage( async deadLetterMessage(
@ -533,7 +545,7 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver
this._throwIfReceiverOrConnectionClosed(); this._throwIfReceiverOrConnectionClosed();
throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId); throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId);
const msgImpl = message as ServiceBusMessageImpl; const msgImpl = message as ServiceBusMessageImpl;
return deadLetterMessage(msgImpl, this._context, this.entityPath, options); return deadLetterMessage(msgImpl, this._context, this.entityPath, options, this._retryOptions);
} }
async renewMessageLock(): Promise<Date> { async renewMessageLock(): Promise<Date> {

Просмотреть файл

@ -320,7 +320,8 @@ export class ServiceBusClient {
{ {
maxAutoLockRenewalDurationInMs: options?.maxAutoLockRenewalDurationInMs, maxAutoLockRenewalDurationInMs: options?.maxAutoLockRenewalDurationInMs,
receiveMode, receiveMode,
abortSignal: options?.abortSignal abortSignal: options?.abortSignal,
retryOptions: this._clientOptions.retryOptions
} }
); );
@ -404,7 +405,8 @@ export class ServiceBusClient {
{ {
maxAutoLockRenewalDurationInMs: options?.maxAutoLockRenewalDurationInMs, maxAutoLockRenewalDurationInMs: options?.maxAutoLockRenewalDurationInMs,
receiveMode, receiveMode,
abortSignal: options?.abortSignal abortSignal: options?.abortSignal,
retryOptions: this._clientOptions.retryOptions
} }
); );

Просмотреть файл

@ -5,6 +5,7 @@ import {
Constants, Constants,
ErrorNameConditionMapper, ErrorNameConditionMapper,
MessagingError, MessagingError,
RetryOptions,
StandardAbortMessage StandardAbortMessage
} from "@azure/core-amqp"; } from "@azure/core-amqp";
import { import {
@ -35,7 +36,7 @@ import {
} from "../models"; } from "../models";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs"; import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { translateServiceBusError } from "../serviceBusError"; import { translateServiceBusError } from "../serviceBusError";
import { abandonMessage, completeMessage } from "../receivers/shared"; import { abandonMessage, completeMessage } from "../receivers/receiverCommon";
import { isDefined } from "../util/typeGuards"; import { isDefined } from "../util/typeGuards";
/** /**
@ -60,6 +61,7 @@ export type MessageSessionOptions = Pick<
"maxAutoLockRenewalDurationInMs" | "abortSignal" "maxAutoLockRenewalDurationInMs" | "abortSignal"
> & { > & {
receiveMode?: ReceiveMode; receiveMode?: ReceiveMode;
retryOptions: RetryOptions | undefined;
}; };
/** /**
@ -345,6 +347,8 @@ export class MessageSession extends LinkEntity<Receiver> {
return rcvrOptions; return rcvrOptions;
} }
private _retryOptions: RetryOptions | undefined;
/** /**
* Constructs a MessageSession instance which lets you receive messages as batches * Constructs a MessageSession instance which lets you receive messages as batches
* or via callbacks using subscribe. * or via callbacks using subscribe.
@ -357,7 +361,7 @@ export class MessageSession extends LinkEntity<Receiver> {
connectionContext: ConnectionContext, connectionContext: ConnectionContext,
entityPath: string, entityPath: string,
private _providedSessionId: string | undefined, private _providedSessionId: string | undefined,
options?: MessageSessionOptions options: MessageSessionOptions
) { ) {
super(entityPath, entityPath, connectionContext, "session", logger, { super(entityPath, entityPath, connectionContext, "session", logger, {
address: entityPath, address: entityPath,
@ -367,7 +371,7 @@ export class MessageSession extends LinkEntity<Receiver> {
receiver: this.link, receiver: this.link,
logPrefix: this.logPrefix logPrefix: this.logPrefix
})); }));
if (!options) options = {}; this._retryOptions = options.retryOptions;
this.autoComplete = false; this.autoComplete = false;
if (isDefined(this._providedSessionId)) this.sessionId = this._providedSessionId; if (isDefined(this._providedSessionId)) this.sessionId = this._providedSessionId;
this.receiveMode = options.receiveMode || "peekLock"; this.receiveMode = options.receiveMode || "peekLock";
@ -653,7 +657,13 @@ export class MessageSession extends LinkEntity<Receiver> {
this.logPrefix, this.logPrefix,
bMessage.messageId bMessage.messageId
); );
await abandonMessage(bMessage, this._context, this.entityPath); await abandonMessage(
bMessage,
this._context,
this.entityPath,
undefined,
this._retryOptions
);
} catch (abandonError) { } catch (abandonError) {
const translatedError = translateServiceBusError(abandonError); const translatedError = translateServiceBusError(abandonError);
logger.logError( logger.logError(
@ -690,7 +700,7 @@ export class MessageSession extends LinkEntity<Receiver> {
this.logPrefix, this.logPrefix,
bMessage.messageId bMessage.messageId
); );
await completeMessage(bMessage, this._context, this.entityPath); await completeMessage(bMessage, this._context, this.entityPath, this._retryOptions);
} catch (completeError) { } catch (completeError) {
const translatedError = translateServiceBusError(completeError); const translatedError = translateServiceBusError(completeError);
logger.logError( logger.logError(
@ -802,10 +812,9 @@ export class MessageSession extends LinkEntity<Receiver> {
async settleMessage( async settleMessage(
message: ServiceBusMessageImpl, message: ServiceBusMessageImpl,
operation: DispositionType, operation: DispositionType,
options?: DispositionStatusOptions options: DispositionStatusOptions
): Promise<any> { ): Promise<any> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (!options) options = {};
if (operation.match(/^(complete|abandon|defer|deadletter)$/) == null) { if (operation.match(/^(complete|abandon|defer|deadletter)$/) == null) {
return reject(new Error(`operation: '${operation}' is not a valid operation.`)); return reject(new Error(`operation: '${operation}' is not a valid operation.`));
} }
@ -870,7 +879,7 @@ export class MessageSession extends LinkEntity<Receiver> {
context: ConnectionContext, context: ConnectionContext,
entityPath: string, entityPath: string,
sessionId: string | undefined, sessionId: string | undefined,
options?: MessageSessionOptions options: MessageSessionOptions
): Promise<MessageSession> { ): Promise<MessageSession> {
throwErrorIfConnectionClosed(context); throwErrorIfConnectionClosed(context);
const messageSession = new MessageSession(context, entityPath, sessionId, options); const messageSession = new MessageSession(context, entityPath, sessionId, options);

Просмотреть файл

@ -345,7 +345,9 @@ describe("AbortSignal", () => {
it("SessionReceiver.subscribe", async () => { it("SessionReceiver.subscribe", async () => {
const connectionContext = createConnectionContextForTestsWithSessionId(); const connectionContext = createConnectionContextForTestsWithSessionId();
const messageSession = await MessageSession.create(connectionContext, "entityPath", "hello"); const messageSession = await MessageSession.create(connectionContext, "entityPath", "hello", {
retryOptions: undefined
});
const session = new ServiceBusSessionReceiverImpl( const session = new ServiceBusSessionReceiverImpl(
messageSession, messageSession,

Просмотреть файл

@ -60,7 +60,8 @@ describe("Message session unit tests", () => {
"dummyEntityPath", "dummyEntityPath",
undefined, undefined,
{ {
receiveMode: lockMode receiveMode: lockMode,
retryOptions: undefined
} }
); );
@ -89,7 +90,8 @@ describe("Message session unit tests", () => {
"dummyEntityPath", "dummyEntityPath",
undefined, undefined,
{ {
receiveMode: lockMode receiveMode: lockMode,
retryOptions: undefined
} }
); );
@ -118,7 +120,8 @@ describe("Message session unit tests", () => {
"dummyEntityPath", "dummyEntityPath",
undefined, undefined,
{ {
receiveMode: lockMode receiveMode: lockMode,
retryOptions: undefined
} }
); );
@ -163,7 +166,8 @@ describe("Message session unit tests", () => {
"dummyEntityPath", "dummyEntityPath",
undefined, undefined,
{ {
receiveMode: lockMode receiveMode: lockMode,
retryOptions: undefined
} }
); );
@ -214,7 +218,8 @@ describe("Message session unit tests", () => {
"dummyEntityPath", "dummyEntityPath",
undefined, undefined,
{ {
receiveMode: lockMode receiveMode: lockMode,
retryOptions: undefined
} }
); );
@ -351,7 +356,8 @@ describe("Message session unit tests", () => {
"entity path", "entity path",
"session id", "session id",
{ {
receiveMode: "receiveAndDelete" receiveMode: "receiveAndDelete",
retryOptions: undefined
} }
); );

Просмотреть файл

@ -302,7 +302,10 @@ describe("Receiver unit tests", () => {
const messageSession = await MessageSession.create( const messageSession = await MessageSession.create(
connectionContext, connectionContext,
"entity path", "entity path",
undefined undefined,
{
retryOptions: undefined
}
); );
const impl = new ServiceBusSessionReceiverImpl( const impl = new ServiceBusSessionReceiverImpl(

Просмотреть файл

@ -1,16 +1,27 @@
// Copyright (c) Microsoft Corporation. // Copyright (c) Microsoft Corporation.
// Licensed under the MIT license. // Licensed under the MIT license.
import { getMessageIterator, wrapProcessErrorHandler } from "../../../src/receivers/shared"; import {
getMessageIterator,
settleMessage,
settleMessageOperation,
wrapProcessErrorHandler
} from "../../../src/receivers/receiverCommon";
import chai from "chai"; import chai from "chai";
import { ServiceBusReceiver } from "../../../src/receivers/receiver"; import { ServiceBusReceiver } from "../../../src/receivers/receiver";
import { ServiceBusLogger } from "../../../src/log"; import { ServiceBusLogger } from "../../../src/log";
import { ProcessErrorArgs } from "../../../src/models"; import { ProcessErrorArgs } from "../../../src/models";
import { ServiceBusError, translateServiceBusError } from "../../../src/serviceBusError"; import { ServiceBusError, translateServiceBusError } from "../../../src/serviceBusError";
import { MessagingError } from "@azure/core-amqp"; import { MessagingError } from "@azure/core-amqp";
import { DispositionType, ServiceBusMessageImpl } from "../../../src/serviceBusMessage";
import { ConnectionContext } from "../../../src/connectionContext";
import { DispositionStatusOptions } from "../../../src/core/managementClient";
import { Delivery } from "rhea-promise";
import { MessageAlreadySettled } from "../../../src/util/errors";
import { assertThrows } from "../../public/utils/testUtils";
const assert = chai.assert; const assert = chai.assert;
describe("shared", () => { describe("shared receiver code", () => {
describe("translateServiceBusError", () => { describe("translateServiceBusError", () => {
[ [
new Error("Plain error"), new Error("Plain error"),
@ -79,6 +90,103 @@ describe("shared", () => {
); );
}); });
}); });
describe("settleMessage", () => {
it("retry options are used and arguments plumbed through", async () => {
const expectedFakeMessage = ({} as any) as ServiceBusMessageImpl;
const expectedFakeContext = ({
connectionId: "hello"
} as any) as ConnectionContext;
let numTimesCalled = 0;
await settleMessage(
expectedFakeMessage,
DispositionType.deadletter,
expectedFakeContext,
"entityPath",
{
retryOptions: {
maxRetries: 1,
retryDelayInMs: 0
},
sessionId: "here just to prove that we're propagating options"
},
async (
message: ServiceBusMessageImpl,
operation: DispositionType,
context: ConnectionContext,
entityPath: string,
options: DispositionStatusOptions
) => {
++numTimesCalled;
assert.deepEqual(message, expectedFakeMessage);
assert.deepEqual(context, expectedFakeContext);
assert.deepEqual(operation, DispositionType.deadletter);
assert.deepEqual(entityPath, "entityPath");
assert.deepEqual(options.sessionId, "here just to prove that we're propagating options");
if (numTimesCalled < 2) {
const err = new Error("Force retries until the last iteration");
(err as any).retryable = true;
throw err;
}
}
);
assert.equal(numTimesCalled, 2);
});
it("already settled message throws message indicating lock was lost (non-session)", async () => {
const fakeMessage = ({
delivery: {
remote_settled: true
} as Delivery
} as any) as ServiceBusMessageImpl;
await assertThrows(
() =>
settleMessageOperation(
fakeMessage,
DispositionType.defer,
{} as ConnectionContext,
"entityPath",
{
retryOptions: undefined
}
),
{
message: MessageAlreadySettled
}
);
});
it("already settled message throws message indicating lock was lost (session)", async () => {
const fakeMessage = ({
sessionId: "any session id",
delivery: {
remote_settled: true
} as Delivery
} as any) as ServiceBusMessageImpl;
await assertThrows(
() =>
settleMessageOperation(
fakeMessage,
DispositionType.defer,
{} as ConnectionContext,
"entityPath",
{
retryOptions: undefined
}
),
{
message: MessageAlreadySettled
}
);
});
});
}); });
it("error handler wrapper", () => { it("error handler wrapper", () => {

Просмотреть файл

@ -2,6 +2,7 @@
// Licensed under the MIT license. // Licensed under the MIT license.
import chai from "chai"; import chai from "chai";
const assert = chai.assert;
import { ServiceBusReceivedMessage, ServiceBusMessage, delay } from "../../../src"; import { ServiceBusReceivedMessage, ServiceBusMessage, delay } from "../../../src";
import * as dotenv from "dotenv"; import * as dotenv from "dotenv";
dotenv.config(); dotenv.config();
@ -197,3 +198,31 @@ export enum EntityNames {
MANAGEMENT_NEW_ENTITY_1 = "management-new-entity-1", MANAGEMENT_NEW_ENTITY_1 = "management-new-entity-1",
MANAGEMENT_NEW_ENTITY_2 = "management-new-entity-2" MANAGEMENT_NEW_ENTITY_2 = "management-new-entity-2"
} }
/**
* Asserts that `fn` throws an error and assert.deepEqual compares all fields common
* between `expectedErr` and `err`.
*
* @param fn A function to execute.
* @param expectedErr The error fields you expect.
* @returns The error thrown, if equal to expectedErr.
*/
export async function assertThrows<T>(
fn: () => Promise<T>,
expectedErr: Record<string, any>
): Promise<Error> {
try {
await fn();
} catch (err) {
const comparableObj: Record<string, any> = {};
for (const k in expectedErr) {
comparableObj[k] = err[k];
}
assert.deepEqual(comparableObj, expectedErr);
return err;
}
throw new Error("assert failure: error was expected, but none was thrown");
}