[Service Bus] Update client side validation for settling msg, renewing locks (#12404)

This commit is contained in:
Ramya Rao 2020-11-10 12:41:08 -08:00 коммит произвёл GitHub
Родитель 6a97120b18
Коммит 007104004f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
13 изменённых файлов: 150 добавлений и 139 удалений

Просмотреть файл

@ -14,11 +14,11 @@ import { ServiceBusReceivedMessage } from "../serviceBusMessage";
import { ConnectionContext } from "../connectionContext";
import {
getAlreadyReceivingErrorMsg,
getErrorMessageNotSupportedInReceiveAndDeleteMode,
getReceiverClosedErrorMsg,
throwErrorIfConnectionClosed,
throwTypeErrorIfParameterMissing,
throwTypeErrorIfParameterNotLong
throwTypeErrorIfParameterNotLong,
throwErrorIfInvalidOperationOnMessage
} from "../util/errors";
import { OnError, OnMessage, ReceiveOptions } from "../core/messageReceiver";
import { StreamingReceiverInitArgs, StreamingReceiver } from "../core/streamingReceiver";
@ -155,7 +155,7 @@ export interface ServiceBusReceiver {
* if the lock on the message has expired or the AMQP link with which the message was received is
* no longer alive. The latter can happen if the receiver was explicitly closed by the user or the
* AMQP link got closed by the library due to network loss or service error.
* @throws Error if the message is already settled. To avoid this error check the `isSettled`
* @throws Error if the message is already settled.
* property on the message if you are not sure whether the message is settled.
* @throws Error if used in `receiveAndDelete` mode because all messages received in this mode
* are pre-settled. To avoid this error, update your code to not settle a message which is received
@ -178,7 +178,7 @@ export interface ServiceBusReceiver {
* if the lock on the message has expired or the AMQP link with which the message was received is
* no longer alive. The latter can happen if the receiver was explicitly closed by the user or the
* AMQP link got closed by the library due to network loss or service error.
* @throws Error if the message is already settled. To avoid this error check the `isSettled`
* @throws Error if the message is already settled.
* property on the message if you are not sure whether the message is settled.
* @throws Error if used in `receiveAndDelete` mode because all messages received in this mode
* are pre-settled. To avoid this error, update your code to not settle a message which is received
@ -206,7 +206,7 @@ export interface ServiceBusReceiver {
* if the lock on the message has expired or the AMQP link with which the message was received is
* no longer alive. The latter can happen if the receiver was explicitly closed by the user or the
* AMQP link got closed by the library due to network loss or service error.
* @throws Error if the message is already settled. To avoid this error check the `isSettled`
* @throws Error if the message is already settled.
* property on the message if you are not sure whether the message is settled.
* @throws Error if used in `receiveAndDelete` mode because all messages received in this mode
* are pre-settled. To avoid this error, update your code to not settle a message which is received
@ -234,7 +234,7 @@ export interface ServiceBusReceiver {
* if the lock on the message has expired or the AMQP link with which the message was received is
* no longer alive. The latter can happen if the receiver was explicitly closed by the user or the
* AMQP link got closed by the library due to network loss or service error.
* @throws Error if the message is already settled. To avoid this error check the `isSettled`
* @throws Error if the message is already settled.
* property on the message if you are not sure whether the message is settled.
* @throws Error if used in `receiveAndDelete` mode because all messages received in this mode
* are pre-settled. To avoid this error, update your code to not settle a message which is received
@ -624,6 +624,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
}
async completeMessage(message: ServiceBusReceivedMessage): Promise<void> {
this._throwIfReceiverOrConnectionClosed();
throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId);
const msgImpl = message as ServiceBusMessageImpl;
return completeMessage(msgImpl, this._context, this.entityPath);
}
@ -632,6 +634,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
message: ServiceBusReceivedMessage,
propertiesToModify?: { [key: string]: any }
): Promise<void> {
this._throwIfReceiverOrConnectionClosed();
throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId);
const msgImpl = message as ServiceBusMessageImpl;
return abandonMessage(msgImpl, this._context, this.entityPath, propertiesToModify);
}
@ -640,6 +644,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
message: ServiceBusReceivedMessage,
propertiesToModify?: { [key: string]: any }
): Promise<void> {
this._throwIfReceiverOrConnectionClosed();
throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId);
const msgImpl = message as ServiceBusMessageImpl;
return deferMessage(msgImpl, this._context, this.entityPath, propertiesToModify);
}
@ -648,35 +654,19 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
message: ServiceBusReceivedMessage,
options?: DeadLetterOptions & { [key: string]: any }
): Promise<void> {
this._throwIfReceiverOrConnectionClosed();
throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId);
const msgImpl = message as ServiceBusMessageImpl;
return deadLetterMessage(msgImpl, this._context, this.entityPath, options);
}
async renewMessageLock(message: ServiceBusReceivedMessage): Promise<Date> {
this._throwIfReceiverOrConnectionClosed();
throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId);
const msgImpl = message as ServiceBusMessageImpl;
if (!msgImpl.delivery) {
throw new Error("A peeked message does not have a lock to be renewed.");
}
let associatedLinkName: string | undefined;
let error: Error | undefined;
if (!message.lockToken) {
error = new Error(
getErrorMessageNotSupportedInReceiveAndDeleteMode(`renew the lock on the message`)
);
} else if (msgImpl.delivery.remote_settled) {
error = new Error(`Failed to renew the lock as this message is already settled.`);
}
if (error) {
logger.logError(
error,
"[%s] An error occurred when renewing the lock on the message with id '%s'",
this._context.connectionId,
message.messageId
);
throw error;
}
if (msgImpl.delivery.link) {
const associatedReceiver = this._context.getReceiverFromCache(msgImpl.delivery.link.name);
associatedLinkName = associatedReceiver?.name;

Просмотреть файл

@ -10,7 +10,8 @@ import {
getReceiverClosedErrorMsg,
throwErrorIfConnectionClosed,
throwTypeErrorIfParameterMissing,
throwTypeErrorIfParameterNotLong
throwTypeErrorIfParameterNotLong,
throwErrorIfInvalidOperationOnMessage
} from "../util/errors";
import { OnError, OnMessage } from "../core/messageReceiver";
import {
@ -494,6 +495,8 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver
}
async completeMessage(message: ServiceBusReceivedMessage): Promise<void> {
this._throwIfReceiverOrConnectionClosed();
throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId);
const msgImpl = message as ServiceBusMessageImpl;
return completeMessage(msgImpl, this._context, this.entityPath);
}
@ -502,6 +505,8 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver
message: ServiceBusReceivedMessage,
propertiesToModify?: { [key: string]: any }
): Promise<void> {
this._throwIfReceiverOrConnectionClosed();
throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId);
const msgImpl = message as ServiceBusMessageImpl;
return abandonMessage(msgImpl, this._context, this.entityPath, propertiesToModify);
}
@ -510,6 +515,8 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver
message: ServiceBusReceivedMessage,
propertiesToModify?: { [key: string]: any }
): Promise<void> {
this._throwIfReceiverOrConnectionClosed();
throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId);
const msgImpl = message as ServiceBusMessageImpl;
return deferMessage(msgImpl, this._context, this.entityPath, propertiesToModify);
}
@ -518,6 +525,8 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver
message: ServiceBusReceivedMessage,
options?: DeadLetterOptions & { [key: string]: any }
): Promise<void> {
this._throwIfReceiverOrConnectionClosed();
throwErrorIfInvalidOperationOnMessage(message, this.receiveMode, this._context.connectionId);
const msgImpl = message as ServiceBusMessageImpl;
return deadLetterMessage(msgImpl, this._context, this.entityPath, options);
}

Просмотреть файл

@ -14,8 +14,8 @@ import {
} from "../serviceBusMessage";
import { DispositionStatusOptions } from "../core/managementClient";
import { ConnectionContext } from "../connectionContext";
import { getErrorMessageNotSupportedInReceiveAndDeleteMode } from "../util/errors";
import { ErrorNameConditionMapper } from "@azure/core-amqp";
import { MessageAlreadySettled } from "../util/errors";
/**
* @internal
@ -202,14 +202,29 @@ function settleMessage(
entityPath: string,
options?: DispositionStatusOptions
): Promise<void> {
if (!message.delivery) {
throw new Error("A peeked message cannot be settled.");
const isDeferredMessage = !message.delivery.link;
const receiver = isDeferredMessage
? undefined
: context.getReceiverFromCache(message.delivery.link.name, message.sessionId);
const associatedLinkName = receiver?.name;
let error: Error | undefined;
if (message.delivery.remote_settled) {
error = new Error(MessageAlreadySettled);
} else if (
!isDeferredMessage &&
(!receiver || !receiver.isOpen()) &&
message.sessionId != undefined
) {
error = translateServiceBusError({
description:
`Failed to ${operation} the message as the AMQP link with which the message was ` +
`received is no longer alive.`,
condition: ErrorNameConditionMapper.SessionLockLostError
});
}
if (!message.lockToken) {
const error = new Error(
getErrorMessageNotSupportedInReceiveAndDeleteMode(`${operation} the message`)
);
if (error) {
receiverLogger.logError(
error,
"[%s] An error occurred when settling a message with id '%s'",
@ -218,41 +233,6 @@ function settleMessage(
);
throw error;
}
const isDeferredMessage = !message.delivery.link;
const receiver = isDeferredMessage
? undefined
: context.getReceiverFromCache(message.delivery.link.name, message.sessionId);
const associatedLinkName = receiver?.name;
if (!isDeferredMessage) {
// In case the message wasn't from a deferred queue,
// 1. We can verify the remote_settled flag on the delivery
// - If the flag is true, throw an error since the message has been settled (Specifically, with a receive link)
// - If the flag is false, we can't say that the message has not been settled
// since settling with the management link won't update the delivery (In this case, service would throw an error)
// 2. If the message has a session-id and if the associated receiver link is unavailable,
// then throw an error since we need a lock on the session to settle the message.
let error: Error | undefined;
if (message.delivery.remote_settled) {
error = new Error(`Failed to ${operation} the message as this message is already settled.`);
} else if ((!receiver || !receiver.isOpen()) && message.sessionId != undefined) {
error = translateServiceBusError({
description:
`Failed to ${operation} the message as the AMQP link with which the message was ` +
`received is no longer alive.`,
condition: ErrorNameConditionMapper.SessionLockLostError
});
}
if (error) {
receiverLogger.logError(
error,
"[%s] An error occurred when settling a message with id '%s'",
context.connectionId,
message.messageId
);
throw error;
}
}
// Message Settlement with managementLink
// 1. If the received message is deferred as such messages can only be settled using managementLink
@ -260,7 +240,7 @@ function settleMessage(
if (isDeferredMessage || ((!receiver || !receiver.isOpen()) && message.sessionId == undefined)) {
return context
.getManagementClient(entityPath)
.updateDispositionStatus(message.lockToken, operation, {
.updateDispositionStatus(message.lockToken!, operation, {
...options,
associatedLinkName,
sessionId: message.sessionId

Просмотреть файл

@ -758,14 +758,6 @@ export class ServiceBusMessageImpl implements ServiceBusReceivedMessage {
* @readonly
*/
readonly deadLetterErrorDescription?: string;
/**
* @property Boolean denoting if the message has already been settled.
* @readonly
*/
public get isSettled(): boolean {
return this.delivery.remote_settled;
}
/**
* @internal
*/

Просмотреть файл

@ -1,9 +1,11 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { logger } from "../log";
import { logger, receiverLogger } from "../log";
import Long from "long";
import { ConnectionContext } from "../connectionContext";
import { ServiceBusReceivedMessage } from "../serviceBusMessage";
import { ReceiveMode } from "../models";
/**
* Error message to use when EntityPath in connection string does not match the
@ -190,10 +192,51 @@ export function throwTypeErrorIfParameterIsEmptyString(
/**
* @internal
* @ignore
* Gets error message for when an operation is not supported in ReceiveAndDelete mode
* @param failedToDo A string to add to the placeholder in the error message. Denotes the action
* that is not supported in ReceiveAndDelete mode
* The error message for operations on the receiver that are invalid for a message received in receiveAndDelete mode.
*/
export function getErrorMessageNotSupportedInReceiveAndDeleteMode(failedToDo: string): string {
return `Failed to ${failedToDo} as the operation is only supported in 'PeekLock' receive mode.`;
export const InvalidOperationInReceiveAndDeleteMode =
"The operation is not supported in 'receiveAndDelete' receive mode.";
/**
* @internal
* @ignore
* The error message for operations on the receiver that are invalid for a peeked message.
*/
export const InvalidOperationForPeekedMessage =
"This operation is not supported for peeked messages. Only messages received using 'receiveMessages()', 'subscribe()' and 'getMessageIterator()' methods on the receiver in 'peekLock' receive mode can be settled.";
/**
* @internal
* @ignore
* The error message for when one attempts to settle an already settled message.
*/
export const MessageAlreadySettled = "The message has either been deleted or already settled";
/**
* Throws error if the ServiceBusReceivedMessage cannot be settled.
* @internal
* @ignore
*/
export function throwErrorIfInvalidOperationOnMessage(
message: ServiceBusReceivedMessage,
receiveMode: ReceiveMode,
connectionId: string
) {
let error: Error | undefined;
if (receiveMode === "receiveAndDelete") {
error = new Error(InvalidOperationInReceiveAndDeleteMode);
} else if (!message.lockToken) {
error = new Error(InvalidOperationForPeekedMessage);
}
if (error) {
receiverLogger.logError(
error,
"[%s] An error occurred for message with id '%s'",
connectionId,
message.messageId
);
throw error;
}
}

Просмотреть файл

@ -258,7 +258,7 @@ describe("Message settlement After Receiver is Closed - Through ManagementLink",
? TestMessage.getSessionSample()
: TestMessage.getSample();
const msg = await sendReceiveMsg(testMessages);
await receiver.close();
const msgDeliveryLink = (msg as ServiceBusMessageImpl).delivery.link.name;
if (entityNames.usesSessions) {

Просмотреть файл

@ -5,7 +5,7 @@ import chai from "chai";
import Long from "long";
import chaiAsPromised from "chai-as-promised";
import { ServiceBusMessage, delay, ProcessErrorArgs } from "../src";
import { getAlreadyReceivingErrorMsg } from "../src/util/errors";
import { getAlreadyReceivingErrorMsg, InvalidOperationForPeekedMessage } from "../src/util/errors";
import { TestClientType, TestMessage } from "./utils/testUtils";
import { ServiceBusReceiver, ServiceBusReceiverImpl } from "../src/receivers/receiver";
import { ServiceBusSender } from "../src/sender";
@ -38,12 +38,7 @@ let deadLetterReceiver: ServiceBusReceiver;
async function beforeEachTest(entityType: TestClientType): Promise<void> {
entityNames = await serviceBusClient.test.createTestEntities(entityType);
receiver = await serviceBusClient.test.createPeekLockReceiver(entityNames, {
// prior to a recent change the behavior was always to _not_ auto-renew locks.
// for compat with these tests I'm just disabling this. There are tests in renewLocks.spec.ts that
// ensure lock renewal does work with batching.
maxAutoLockRenewalDurationInMs: 0
});
receiver = await serviceBusClient.test.createPeekLockReceiver(entityNames);
sender = serviceBusClient.test.addToCleanup(
serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!)
@ -389,13 +384,18 @@ describe("Batching Receiver", () => {
await sender.sendMessages(testMessages);
const [peekedMsg] = await receiver.peekMessages(1);
if (!peekedMsg) {
// Sometimes the peek call does not return any messages :(
return;
}
should.equal(
!(peekedMsg as any)["delivery"],
!peekedMsg.lockToken,
true,
"Peeked msg was not meant to have delivery! We use this assumption to differentiate between peeked msg and other messages."
"Peeked msg was not meant to have lockToken! We use this assumption to differentiate between peeked msg and other messages."
);
const expectedErrorMsg = "A peeked message cannot be settled.";
const expectedErrorMsg = InvalidOperationForPeekedMessage;
try {
await receiver.completeMessage(peekedMsg);
assert.fail("completeMessage should have failed");
@ -782,11 +782,10 @@ describe("Batching Receiver", () => {
: TestMessage.getSample();
await sender.sendMessages(testMessages);
// If using sessions, we need a receiver with lock renewal disabled so that
// We need a receiver with lock renewal disabled so that
// the message lands back in the queue/subscription to be picked up again.
await receiver.close();
if (entityNames.usesSessions) {
await receiver.close();
receiver = await serviceBusClient.test.acceptSessionWithPeekLock(
entityNames,
testMessages.sessionId!,
@ -794,6 +793,10 @@ describe("Batching Receiver", () => {
maxAutoLockRenewalDurationInMs: 0
}
);
} else {
receiver = await serviceBusClient.test.createPeekLockReceiver(entityNames, {
maxAutoLockRenewalDurationInMs: 0
});
}
let batch = await receiver.receiveMessages(1);

Просмотреть файл

@ -106,7 +106,7 @@ describe("Deferred Messages", () => {
should.equal(
!!(deferredMsg as any)["delivery"],
true,
"Deferred msg should have delivery! We use this assumption to differentiate between peeked msg and other messages."
"Deferred msg should have delivery! We use this assumption to differentiate between deferred msg and other messages when settling."
);
should.equal(deferredMsg.body, testMessage.body, "MessageBody is different than expected");
should.equal(

Просмотреть файл

@ -15,7 +15,7 @@ import {
import { TestClientType, TestMessage, checkWithTimeout } from "./utils/testUtils";
import { getErrorMessageNotSupportedInReceiveAndDeleteMode } from "../src/util/errors";
import { InvalidOperationInReceiveAndDeleteMode } from "../src/util/errors";
import { ServiceBusSender } from "../src/sender";
import {
EntityName,
@ -242,9 +242,9 @@ describe("receive and delete", () => {
return msgs[0];
}
const testError = (err: Error, operation: DispositionType): void => {
expect(err.message.toLowerCase(), "ErrorMessage is different than expected").includes(
`failed to ${operation} the message as the operation is only supported in \'peeklock\' receive mode.`
const testError = (err: Error): void => {
expect(err.message, "ErrorMessage is different than expected").equals(
InvalidOperationInReceiveAndDeleteMode
);
};
@ -266,7 +266,7 @@ describe("receive and delete", () => {
}
} catch (err) {
errorWasThrown = true;
testError(err, operation);
testError(err);
}
should.equal(errorWasThrown, true, "Error thrown flag must be true");
@ -320,7 +320,7 @@ describe("receive and delete", () => {
await receiver.renewMessageLock(msg).catch((err) => {
should.equal(
err.message,
getErrorMessageNotSupportedInReceiveAndDeleteMode("renew the lock on the message"),
InvalidOperationInReceiveAndDeleteMode,
"ErrorMessage is different than expected"
);
errorWasThrown = true;
@ -453,9 +453,9 @@ describe("receive and delete", () => {
return deferredMsg;
}
const testError = (err: Error, operation: DispositionType): void => {
expect(err.message.toLowerCase(), "ErrorMessage is different than expected").includes(
`failed to ${operation} the message as the operation is only supported in \'peeklock\' receive mode.`
const testError = (err: Error): void => {
expect(err.message, "ErrorMessage is different than expected").equals(
InvalidOperationInReceiveAndDeleteMode
);
};
@ -477,7 +477,7 @@ describe("receive and delete", () => {
}
} catch (err) {
errorWasThrown = true;
testError(err, operation);
testError(err);
}
should.equal(errorWasThrown, true, "Error thrown flag must be true");
@ -521,7 +521,7 @@ describe("receive and delete", () => {
await receiver.renewMessageLock(deferredMsg).catch((err) => {
should.equal(
err.message,
getErrorMessageNotSupportedInReceiveAndDeleteMode("renew the lock on the message"),
InvalidOperationInReceiveAndDeleteMode,
"ErrorMessage is different than expected"
);
errorWasThrown = true;

Просмотреть файл

@ -18,6 +18,7 @@ import { ServiceBusReceiver } from "../src/receivers/receiver";
import { ServiceBusSender } from "../src/sender";
import { ServiceBusReceivedMessage } from "../src/serviceBusMessage";
import { ProcessErrorArgs } from "../src/models";
import { InvalidOperationForPeekedMessage } from "../src/util/errors";
describe("Message Lock Renewal", () => {
let serviceBusClient: ServiceBusClientForTests;
@ -78,7 +79,7 @@ describe("Message Lock Renewal", () => {
await receiver.renewMessageLock(peekedMsg);
assert.fail("renewMessageLock should have failed");
} catch (error) {
should.equal(error.message, "A peeked message does not have a lock to be renewed.");
should.equal(error.message, InvalidOperationForPeekedMessage);
}
// Clean up any left over messages

Просмотреть файл

@ -501,9 +501,10 @@ describe("ServiceBusClient live tests", () => {
caughtError = error;
}
const expectedErrorMsg =
`Failed to ${operation} the message as the AMQP link with which the message was ` +
`received is no longer alive.`;
const expectedErrorMsg = getReceiverClosedErrorMsg(
receiver.entityPath,
receivedMessage.sessionId
);
should.equal(caughtError && caughtError.message, expectedErrorMsg);
}
@ -730,6 +731,8 @@ describe("ServiceBusClient live tests", () => {
await beforeEachTest(noSessionTestClientType, entityToClose);
await testReceiver(getReceiverClosedErrorMsg(receiver.entityPath));
await testAllDispositions();
});
it(

Просмотреть файл

@ -4,7 +4,7 @@
import chai from "chai";
import chaiAsPromised from "chai-as-promised";
import { ServiceBusReceivedMessage, delay, ProcessErrorArgs } from "../src";
import { getAlreadyReceivingErrorMsg } from "../src/util/errors";
import { getAlreadyReceivingErrorMsg, MessageAlreadySettled } from "../src/util/errors";
import { TestMessage, checkWithTimeout, TestClientType } from "./utils/testUtils";
import { DispositionType, ServiceBusMessageImpl } from "../src/serviceBusMessage";
import { ServiceBusReceiver } from "../src/receivers/receiver";
@ -494,12 +494,8 @@ describe("Streaming Receiver Tests", () => {
await beforeEachTest();
});
const testError = (err: Error, operation: DispositionType): void => {
should.equal(
err.message,
`Failed to ${operation} the message as this message is already settled.`,
"ErrorMessage is different than expected"
);
const testError = (err: Error): void => {
should.equal(err.message, MessageAlreadySettled, "ErrorMessage is different than expected");
errorWasThrown = true;
};
@ -544,13 +540,13 @@ describe("Streaming Receiver Tests", () => {
await testPeekMsgsLength(receiver, 0);
if (operation === DispositionType.complete) {
await receiver.completeMessage(receivedMsgs[0]).catch((err) => testError(err, operation));
await receiver.completeMessage(receivedMsgs[0]).catch((err) => testError(err));
} else if (operation === DispositionType.abandon) {
await receiver.abandonMessage(receivedMsgs[0]).catch((err) => testError(err, operation));
await receiver.abandonMessage(receivedMsgs[0]).catch((err) => testError(err));
} else if (operation === DispositionType.deadletter) {
await receiver.deadLetterMessage(receivedMsgs[0]).catch((err) => testError(err, operation));
await receiver.deadLetterMessage(receivedMsgs[0]).catch((err) => testError(err));
} else if (operation === DispositionType.defer) {
await receiver.deferMessage(receivedMsgs[0]).catch((err) => testError(err, operation));
await receiver.deferMessage(receivedMsgs[0]).catch((err) => testError(err));
}
should.equal(errorWasThrown, true, "Error thrown flag must be true");

Просмотреть файл

@ -4,7 +4,7 @@
import chai from "chai";
import chaiAsPromised from "chai-as-promised";
import { ServiceBusReceivedMessage, delay, ProcessErrorArgs } from "../src";
import { getAlreadyReceivingErrorMsg } from "../src/util/errors";
import { getAlreadyReceivingErrorMsg, MessageAlreadySettled } from "../src/util/errors";
import { TestClientType, TestMessage, checkWithTimeout } from "./utils/testUtils";
import { DispositionType } from "../src/serviceBusMessage";
import {
@ -530,12 +530,8 @@ describe("Streaming with sessions", () => {
await beforeEachTest();
});
const testError = (err: Error, operation: DispositionType): void => {
should.equal(
err.message,
`Failed to ${operation} the message as this message is already settled.`,
"ErrorMessage is different than expected"
);
const testError = (err: Error): void => {
should.equal(err.message, MessageAlreadySettled, "ErrorMessage is different than expected");
errorWasThrown = true;
};
@ -581,15 +577,13 @@ describe("Streaming with sessions", () => {
await testPeekMsgsLength(receiver, 0);
if (operation === DispositionType.complete) {
await receiver.completeMessage(receivedMsgs[0]).catch((err) => testError(err, operation));
await receiver.completeMessage(receivedMsgs[0]).catch((err) => testError(err));
} else if (operation === DispositionType.abandon) {
await receiver.abandonMessage(receivedMsgs[0]).catch((err) => testError(err, operation));
await receiver.abandonMessage(receivedMsgs[0]).catch((err) => testError(err));
} else if (operation === DispositionType.deadletter) {
await receiver
.deadLetterMessage(receivedMsgs[0])
.catch((err) => testError(err, operation));
await receiver.deadLetterMessage(receivedMsgs[0]).catch((err) => testError(err));
} else if (operation === DispositionType.defer) {
await receiver.deferMessage(receivedMsgs[0]).catch((err) => testError(err, operation));
await receiver.deferMessage(receivedMsgs[0]).catch((err) => testError(err));
}
should.equal(errorWasThrown, true, "Error thrown flag must be true");