[service-bus] Pass in ProcessErrorArgs to subscribe({ processError }) for extra diagnostics (#11927)
Users that want the simplest and most robust experience possible are encouraged to user `Receiver.subscribe` when writing their receive loops. This breaks things down to just passing us a couple of callbacks. As part of an overall effort this commit adds in some extra context information (mimicking what we're doing in the .NET ServiceBusProcessor) to let users make better decisions in their processError handler by passing in a `ProcessErrorArgs` object, which gives the user some extra context about where the error originated from (`errorSource`) and the host and entity path.
This commit is contained in:
Родитель
6078ce8c17
Коммит
2da4402790
|
@ -8,6 +8,9 @@
|
|||
[PR 11651](https://github.com/Azure/azure-sdk-for-js/pull/11651)
|
||||
and
|
||||
[PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810)
|
||||
- The `processError` passed to `Receiver.subscribe` now receives a `ProcessErrorArgs` instead of just an error. This parameter provides additional context that can make it simpler to distinguish
|
||||
errors that were thrown from your callback (via the `errorSource` member of `ProcessErrorArgs`) as well as giving you some information about the entity that generated the error.
|
||||
[PR 11927](https://github.com/Azure/azure-sdk-for-js/pull/11927)
|
||||
|
||||
- Added new "userId" property to `ServiceBusMessage` interface. [PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810)
|
||||
|
||||
|
@ -52,6 +55,7 @@
|
|||
- `acceptSession`, which opens a session by name
|
||||
- `acceptNextSession`, which opens the next available session, determined by Service Bus.
|
||||
- as part of this `CreateSessionReceiverOptions` has been renamed to `AcceptSessionReceiverOptions` to conform to guidelines.
|
||||
- The `processError` handler passed to `Receiver.subscribe` now takes a `ProcessErrorArgs` instead of just an error.
|
||||
|
||||
## 7.0.0-preview.6 (2020-09-10)
|
||||
|
||||
|
|
|
@ -112,7 +112,7 @@ To interact with these resources, one should be familiar with the following SDK
|
|||
|
||||
- Send messages, to a queue or topic, using a [`Sender`][sender] created using [`ServiceBusClient.createSender()`][sbclient_createsender].
|
||||
- Receive messages, from either a queue or a subscription, using a [`Receiver`][receiver] created using [`ServiceBusClient.createReceiver()`][sbclient_createreceiver].
|
||||
- Receive messages, from session enabled queues or subscriptions, using a [`SessionReceiver`][sessionreceiver] created using [`ServiceBusClient.acceptSession()`][sbclient_createsessionreceiver].
|
||||
- Receive messages, from session enabled queues or subscriptions, using a [`SessionReceiver`][sessionreceiver] created using [`ServiceBusClient.acceptSession()`][sbclient_acceptsession].
|
||||
|
||||
Please note that the Queues, Topics and Subscriptions should be created prior to using this library.
|
||||
|
||||
|
@ -133,7 +133,7 @@ The following sections provide code snippets that cover some of the common tasks
|
|||
Once you have created an instance of a `ServiceBusClient` class, you can get a `Sender`
|
||||
using the [createSender][sbclient_createsender] method.
|
||||
|
||||
This gives you a sender which you can use to [send][sender_send] messages.
|
||||
This gives you a sender which you can use to [send][sender_sendmessages] messages.
|
||||
|
||||
```javascript
|
||||
const sender = serviceBusClient.createSender("my-queue");
|
||||
|
@ -175,7 +175,7 @@ You can use this receiver in one of 3 ways to receive messages:
|
|||
|
||||
#### Get an array of messages
|
||||
|
||||
Use the [receiveMessages][receiverreceivebatch] function which returns a promise that
|
||||
Use the [receiveMessages][receiver_receivemessages] function which returns a promise that
|
||||
resolves to an array of messages.
|
||||
|
||||
```javascript
|
||||
|
@ -192,9 +192,13 @@ When you are done, call `receiver.close()` to stop receiving any more messages.
|
|||
```javascript
|
||||
const myMessageHandler = async (message) => {
|
||||
// your code here
|
||||
console.log(`message.body: ${message.body}`);
|
||||
};
|
||||
const myErrorHandler = async (error) => {
|
||||
console.log(error);
|
||||
const myErrorHandler = async (args) => {
|
||||
console.log(
|
||||
`Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `,
|
||||
args.error
|
||||
);
|
||||
};
|
||||
receiver.subscribe({
|
||||
processMessage: myMessageHandler,
|
||||
|
@ -225,7 +229,7 @@ To learn more, please read [Settling Received Messages](https://docs.microsoft.c
|
|||
> read more about how to configure this feature in the portal [here][docsms_messagesessions_fifo].
|
||||
|
||||
In order to send messages to a session, use the `ServiceBusClient` to create a sender using
|
||||
[createSender][sbclient_createsender]. This gives you a sender which you can use to [send][sender_send] messages.
|
||||
[createSender][sbclient_createsender]. This gives you a sender which you can use to [send][sender_sendmessages] messages.
|
||||
|
||||
When sending the message, set the `sessionId` property in the message to ensure
|
||||
your message lands in the right session.
|
||||
|
@ -374,19 +378,19 @@ If you'd like to contribute to this library, please read the [contributing guide
|
|||
[azure_identity]: https://github.com/Azure/azure-sdk-for-js/blob/master/sdk/identity/identity/README.md
|
||||
[defaultazurecredential]: https://github.com/Azure/azure-sdk-for-js/tree/master/sdk/identity/identity#defaultazurecredential
|
||||
[sbclient]: https://docs.microsoft.com/javascript/api/%40azure/service-bus/servicebusclient?view=azure-node-preview
|
||||
[sbclient_constructor]: https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebusclient?view=azure-node-preview#servicebusclient-string--servicebusclientoptions-
|
||||
[sbclient_tokencred_overload]: https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebusclient?view=azure-node-preview#servicebusclient-string--tokencredential--servicebusclientoptions-
|
||||
[sbclient_createsender]: https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebusclient?view=azure-node-preview#createsender-string-
|
||||
[sbclient_createreceiver]: https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebusclient?view=azure-node-preview#createreceiver-string--createreceiveroptions--peeklock---
|
||||
[sbclient_createsessionreceiver]: https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebusclient?view=azure-node-preview#createsessionreceiver-string--createsessionreceiveroptions--peeklock---
|
||||
[sender]: https://docs.microsoft.com/javascript/api/@azure/service-bus/sender?view=azure-node-preview
|
||||
[sender_send]: https://docs.microsoft.com/javascript/api/@azure/service-bus/sender?view=azure-node-preview#sendmessages-servicebusmessage---servicebusmessage-----servicebusmessagebatch--operationoptionsbase-
|
||||
[receiver]: https://docs.microsoft.com/javascript/api/@azure/service-bus/receiver?view=azure-node-preview
|
||||
[receiverreceivebatch]: https://docs.microsoft.com/javascript/api/@azure/service-bus/receiver?view=azure-node-preview#receivemessages-number--receivemessagesoptions-
|
||||
[receiver_subscribe]: https://docs.microsoft.com/javascript/api/@azure/service-bus/receiver?view=azure-node-preview#subscribe-messagehandlers-receivedmessaget---subscribeoptions-
|
||||
[receiver_getmessageiterator]: https://docs.microsoft.com/javascript/api/@azure/service-bus/receiver?view=azure-node-preview#getmessageiterator-getmessageiteratoroptions-
|
||||
[sessionreceiver]: https://docs.microsoft.com/javascript/api/@azure/service-bus/sessionreceiver?view=azure-node-preview
|
||||
[migrationguide]: https://github.com/Azure/azure-sdk-for-js/blob/fb53a838e702a075c4db6f1d4a17849a271342df/sdk/servicebus/service-bus/migrationguide.md
|
||||
[sbclient_constructor]: https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebusclient?view=azure-node-preview#ServiceBusClient_string__ServiceBusClientOptions_
|
||||
[sbclient_tokencred_overload]: https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebusclient?view=azure-node-preview#ServiceBusClient_string__TokenCredential__ServiceBusClientOptions_
|
||||
[sbclient_createsender]: https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebusclient?view=azure-node-preview#createSender_string_
|
||||
[sbclient_createreceiver]: https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebusclient?view=azure-node-preview#createReceiver_string__CreateReceiverOptions__peekLock___
|
||||
[sbclient_acceptsession]: https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebusclient?view=azure-node-preview#acceptSession_string__string__AcceptSessionOptions__peekLock___
|
||||
[sender]: https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebussender?view=azure-node-preview
|
||||
[sender_sendmessages]: https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebussender?view=azure-node-preview#sendMessages_ServiceBusMessage___ServiceBusMessage_____ServiceBusMessageBatch__OperationOptionsBase_
|
||||
[receiver]: https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebusreceiver?view=azure-node-preview
|
||||
[receiver_receivemessages]: https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebusreceiver?view=azure-node-preview#receiveMessages_number__ReceiveMessagesOptions_
|
||||
[receiver_subscribe]: https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebusreceiver?view=azure-node-preview#subscribe_MessageHandlers_ReceivedMessageT___SubscribeOptions_
|
||||
[receiver_getmessageiterator]: https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebusreceiver?view=azure-node-preview#getMessageIterator_GetMessageIteratorOptions_
|
||||
[sessionreceiver]: https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebussessionreceiver?view=azure-node-preview
|
||||
[migrationguide]: https://github.com/Azure/azure-sdk-for-js/blob/master/sdk/servicebus/service-bus/migrationguide.md
|
||||
[docsms_messagesessions]: https://docs.microsoft.com/azure/service-bus-messaging/message-sessions
|
||||
[docsms_messagesessions_fifo]: https://docs.microsoft.com/azure/service-bus-messaging/message-sessions#first-in-first-out-fifo-pattern
|
||||
[queue_concept]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messaging-overview#queues
|
||||
|
|
|
@ -101,8 +101,9 @@ brings this package in line with the [Azure SDK Design Guidelines for Typescript
|
|||
queueOrSubscriptionReceiver.subscribe({
|
||||
processMessage: onMessageFn,
|
||||
// `processError` is now declared as async and should return a promise.
|
||||
processError: async (err) => {
|
||||
onErrorFn(err);
|
||||
processError: async (args: ProcessErrorArgs) => {
|
||||
// additional information is in 'args' to provide context for the error.
|
||||
onErrorFn(args.error);
|
||||
}
|
||||
});
|
||||
```
|
||||
|
|
|
@ -193,7 +193,7 @@ export interface GetMessageIteratorOptions extends OperationOptionsBase {
|
|||
|
||||
// @public
|
||||
export interface MessageHandlers<ReceivedMessageT> {
|
||||
processError(err: Error): Promise<void>;
|
||||
processError(args: ProcessErrorArgs): Promise<void>;
|
||||
processMessage(message: ReceivedMessageT): Promise<void>;
|
||||
}
|
||||
|
||||
|
@ -223,6 +223,14 @@ export interface PeekMessagesOptions extends OperationOptionsBase {
|
|||
fromSequenceNumber?: Long;
|
||||
}
|
||||
|
||||
// @public
|
||||
export interface ProcessErrorArgs {
|
||||
entityPath: string;
|
||||
error: Error | MessagingError;
|
||||
errorSource: "abandon" | "complete" | "processMessageCallback" | "receive" | "renewLock";
|
||||
fullyQualifiedNamespace: string;
|
||||
}
|
||||
|
||||
// @public
|
||||
export interface QueueProperties {
|
||||
authorizationRules?: AuthorizationRule[];
|
||||
|
|
|
@ -103,8 +103,8 @@ async function receiveMessage() {
|
|||
await brokeredMessage.deadLetter();
|
||||
}
|
||||
};
|
||||
const processError = async (err) => {
|
||||
console.log(">>>>> Error occurred: ", err);
|
||||
const processError = async (args) => {
|
||||
console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error);
|
||||
};
|
||||
receiver.subscribe(
|
||||
{ processMessage, processError },
|
||||
|
|
|
@ -103,8 +103,8 @@ async function receiveFromNextSession(serviceBusClient) {
|
|||
refreshTimer();
|
||||
await processMessage(msg);
|
||||
},
|
||||
async processError(err) {
|
||||
rejectSessionWithError(err);
|
||||
async processError(args) {
|
||||
rejectSessionWithError(args.error);
|
||||
}
|
||||
},
|
||||
{
|
||||
|
|
|
@ -30,8 +30,8 @@ async function main() {
|
|||
await brokeredMessage.complete();
|
||||
};
|
||||
|
||||
const processError = async (err) => {
|
||||
console.log("Error occurred: ", err);
|
||||
const processError = async (args) => {
|
||||
console.log(`Error from error source ${args.errorSource} occurred: `, args.error);
|
||||
};
|
||||
|
||||
try {
|
||||
|
|
|
@ -75,8 +75,8 @@ async function receiveMessages(sbClient) {
|
|||
console.log(`Received message: ${brokeredMessage.body} - ${brokeredMessage.label}`);
|
||||
await brokeredMessage.complete();
|
||||
};
|
||||
const processError = async (err) => {
|
||||
console.log("Error occurred: ", err);
|
||||
const processError = async (args) => {
|
||||
console.log(`Error from error source ${args.errorSource} occurred: `, args.error);
|
||||
};
|
||||
|
||||
console.log(`\nStarting receiver immediately at ${new Date(Date.now())}`);
|
||||
|
|
|
@ -81,8 +81,8 @@ async function receiveMessages(sbClient, sessionId) {
|
|||
const processMessage = async (message) => {
|
||||
console.log(`Received: ${message.sessionId} - ${message.body} `);
|
||||
};
|
||||
const processError = async (err) => {
|
||||
console.log(">>>>> Error occurred: ", err);
|
||||
const processError = async (args) => {
|
||||
console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error);
|
||||
};
|
||||
|
||||
receiver.subscribe({
|
||||
|
|
|
@ -105,8 +105,8 @@ async function receiveMessage() {
|
|||
await brokeredMessage.deadLetter();
|
||||
}
|
||||
};
|
||||
const processError = async (err) => {
|
||||
console.log(">>>>> Error occurred: ", err);
|
||||
const processError = async (args) => {
|
||||
console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error);
|
||||
};
|
||||
|
||||
receiver.subscribe(
|
||||
|
|
|
@ -112,8 +112,8 @@ async function receiveFromNextSession(serviceBusClient: ServiceBusClient): Promi
|
|||
refreshTimer();
|
||||
await processMessage(msg);
|
||||
},
|
||||
async processError(err) {
|
||||
rejectSessionWithError(err);
|
||||
async processError(args) {
|
||||
rejectSessionWithError(args.error);
|
||||
}
|
||||
},
|
||||
{
|
||||
|
|
|
@ -34,8 +34,8 @@ export async function main() {
|
|||
console.log(`Received message: ${brokeredMessage.body}`);
|
||||
await brokeredMessage.complete();
|
||||
};
|
||||
const processError = async (err) => {
|
||||
console.log("Error occurred: ", err);
|
||||
const processError = async (args) => {
|
||||
console.log(`Error from error source ${args.errorSource} occurred: `, args.error);
|
||||
};
|
||||
|
||||
try {
|
||||
|
|
|
@ -78,8 +78,8 @@ async function receiveMessages(sbClient: ServiceBusClient) {
|
|||
console.log(`Received message: ${brokeredMessage.body} - ${brokeredMessage.label}`);
|
||||
await brokeredMessage.complete();
|
||||
};
|
||||
const processError = async (err) => {
|
||||
console.log("Error occurred: ", err);
|
||||
const processError = async (args) => {
|
||||
console.log(`Error from error source ${args.errorSource} occurred: `, args.error);
|
||||
};
|
||||
|
||||
console.log(`\nStarting receiver immediately at ${new Date(Date.now())}`);
|
||||
|
|
|
@ -85,8 +85,8 @@ async function receiveMessages(sbClient: ServiceBusClient, sessionId: string) {
|
|||
const processMessage = async (message: ServiceBusMessage) => {
|
||||
console.log(`Received: ${message.sessionId} - ${message.body} `);
|
||||
};
|
||||
const processError = async (err) => {
|
||||
console.log(">>>>> Error occurred: ", err);
|
||||
const processError = async (args) => {
|
||||
console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error);
|
||||
};
|
||||
receiver.subscribe({
|
||||
processMessage,
|
||||
|
|
|
@ -6,7 +6,7 @@ import { receiverLogger as logger } from "../log";
|
|||
import { ServiceBusMessageImpl } from "../serviceBusMessage";
|
||||
import { calculateRenewAfterDuration } from "../util/utils";
|
||||
import { LinkEntity } from "./linkEntity";
|
||||
import { OnError } from "./messageReceiver";
|
||||
import { OnErrorNoContext } from "./messageReceiver";
|
||||
|
||||
/**
|
||||
* @internal
|
||||
|
@ -118,7 +118,7 @@ export class LockRenewer {
|
|||
*
|
||||
* @param bMessage The message whose lock renewal we will start.
|
||||
*/
|
||||
start(linkEntity: MinimalLink, bMessage: RenewableMessageProperties, onError: OnError) {
|
||||
start(linkEntity: MinimalLink, bMessage: RenewableMessageProperties, onError: OnErrorNoContext) {
|
||||
try {
|
||||
const logPrefix = linkEntity.logPrefix;
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@ import { LinkEntity, ReceiverType } from "./linkEntity";
|
|||
import { ConnectionContext } from "../connectionContext";
|
||||
import { DispositionType, ServiceBusMessageImpl } from "../serviceBusMessage";
|
||||
import { getUniqueName } from "../util/utils";
|
||||
import { ReceiveMode, SubscribeOptions } from "../models";
|
||||
import { ProcessErrorArgs, ReceiveMode, SubscribeOptions } from "../models";
|
||||
import { DispositionStatusOptions } from "./managementClient";
|
||||
import { AbortSignalLike } from "@azure/core-http";
|
||||
import { onMessageSettled, DeferredPromiseAndTimer } from "./shared";
|
||||
|
@ -80,7 +80,21 @@ export interface OnMessage {
|
|||
export interface OnError {
|
||||
/**
|
||||
* Handler for any error that occurs while receiving or processing messages.
|
||||
*
|
||||
* NOTE: if this signature changes make sure you reflect those same changes in the
|
||||
* `OnErrorNoContext` definition below.
|
||||
*/
|
||||
(args: ProcessErrorArgs): void;
|
||||
}
|
||||
|
||||
/**
|
||||
* An onError method but without the context property. Used when wrapping OnError
|
||||
* with an implicit ProcessErrorContext. Used by LockRenewer.
|
||||
*
|
||||
* @internal
|
||||
* @ignore
|
||||
*/
|
||||
export interface OnErrorNoContext {
|
||||
(error: MessagingError | Error): void;
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ import {
|
|||
} from "@azure/core-amqp";
|
||||
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
|
||||
import { receiverLogger as logger } from "../log";
|
||||
import { AmqpError, EventContext, isAmqpError, OnAmqpEvent } from "rhea-promise";
|
||||
import { AmqpError, EventContext, OnAmqpEvent } from "rhea-promise";
|
||||
import { ServiceBusMessageImpl } from "../serviceBusMessage";
|
||||
import { AbortSignalLike } from "@azure/abort-controller";
|
||||
|
||||
|
@ -199,7 +199,12 @@ export class StreamingReceiver extends MessageReceiver {
|
|||
"retryable, we let the user know about it by calling the user's error handler.",
|
||||
this.logPrefix
|
||||
);
|
||||
this._onError!(sbError);
|
||||
this._onError!({
|
||||
error: sbError,
|
||||
errorSource: "receive",
|
||||
entityPath: this.entityPath,
|
||||
fullyQualifiedNamespace: this._context.config.host
|
||||
});
|
||||
} else {
|
||||
logger.verbose(
|
||||
"%s The received error is not retryable. However, the receiver was " +
|
||||
|
@ -234,7 +239,12 @@ export class StreamingReceiver extends MessageReceiver {
|
|||
"retryable, we let the user know about it by calling the user's error handler.",
|
||||
this.logPrefix
|
||||
);
|
||||
this._onError!(sbError);
|
||||
this._onError!({
|
||||
error: sbError,
|
||||
errorSource: "receive",
|
||||
entityPath: this.entityPath,
|
||||
fullyQualifiedNamespace: this._context.config.host
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -262,25 +272,32 @@ export class StreamingReceiver extends MessageReceiver {
|
|||
|
||||
this._lockRenewer?.start(this, bMessage, (err) => {
|
||||
if (this._onError) {
|
||||
this._onError(err);
|
||||
this._onError({
|
||||
error: err,
|
||||
errorSource: "renewLock",
|
||||
entityPath: this.entityPath,
|
||||
fullyQualifiedNamespace: this._context.config.host
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
await this._onMessage(bMessage);
|
||||
} catch (err) {
|
||||
// This ensures we call users' error handler when users' message handler throws.
|
||||
if (!isAmqpError(err)) {
|
||||
logger.logError(
|
||||
err,
|
||||
"%s An error occurred while running user's message handler for the message " +
|
||||
"with id '%s' on the receiver '%s'",
|
||||
this.logPrefix,
|
||||
bMessage.messageId,
|
||||
this.name
|
||||
);
|
||||
this._onError!(err);
|
||||
}
|
||||
logger.logError(
|
||||
err,
|
||||
"%s An error occurred while running user's message handler for the message " +
|
||||
"with id '%s' on the receiver '%s'",
|
||||
this.logPrefix,
|
||||
bMessage.messageId,
|
||||
this.name
|
||||
);
|
||||
this._onError!({
|
||||
error: err,
|
||||
errorSource: "processMessageCallback",
|
||||
entityPath: this.entityPath,
|
||||
fullyQualifiedNamespace: this._context.config.host
|
||||
});
|
||||
|
||||
// Do not want renewLock to happen unnecessarily, while abandoning the message. Hence,
|
||||
// doing this here. Otherwise, this should be done in finally.
|
||||
|
@ -314,7 +331,12 @@ export class StreamingReceiver extends MessageReceiver {
|
|||
bMessage.messageId,
|
||||
this.name
|
||||
);
|
||||
this._onError!(translatedError);
|
||||
this._onError!({
|
||||
error: translatedError,
|
||||
errorSource: "abandon",
|
||||
entityPath: this.entityPath,
|
||||
fullyQualifiedNamespace: this._context.config.host
|
||||
});
|
||||
}
|
||||
}
|
||||
return;
|
||||
|
@ -346,7 +368,12 @@ export class StreamingReceiver extends MessageReceiver {
|
|||
bMessage.messageId,
|
||||
this.name
|
||||
);
|
||||
this._onError!(translatedError);
|
||||
this._onError!({
|
||||
error: translatedError,
|
||||
errorSource: "complete",
|
||||
entityPath: this.entityPath,
|
||||
fullyQualifiedNamespace: this._context.config.host
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -513,7 +540,12 @@ export class StreamingReceiver extends MessageReceiver {
|
|||
if (typeof this._onError === "function") {
|
||||
logger.verbose(`${this.logPrefix} Unable to automatically reconnect`);
|
||||
try {
|
||||
this._onError(err);
|
||||
this._onError({
|
||||
error: err,
|
||||
errorSource: "receive",
|
||||
entityPath: this.entityPath,
|
||||
fullyQualifiedNamespace: this._context.config.host
|
||||
});
|
||||
} catch (err) {
|
||||
logger.logError(
|
||||
err,
|
||||
|
|
|
@ -22,6 +22,7 @@ export {
|
|||
AcceptSessionOptions,
|
||||
GetMessageIteratorOptions,
|
||||
MessageHandlers,
|
||||
ProcessErrorArgs,
|
||||
PeekMessagesOptions,
|
||||
ReceiveMessagesOptions,
|
||||
ReceiveMode,
|
||||
|
|
|
@ -3,6 +3,35 @@
|
|||
|
||||
import { OperationOptionsBase } from "./modelsToBeSharedWithEventHubs";
|
||||
import Long from "long";
|
||||
import { MessagingError } from "@azure/core-amqp";
|
||||
|
||||
/**
|
||||
* Arguments to the `processError` callback.
|
||||
*/
|
||||
export interface ProcessErrorArgs {
|
||||
/**
|
||||
* The error.
|
||||
*/
|
||||
error: Error | MessagingError;
|
||||
/**
|
||||
* The operation where the error originated.
|
||||
*
|
||||
* 'abandon': Errors that occur when if `abandon` is triggered automatically.
|
||||
* 'complete': Errors that occur when autoComplete completes a message.
|
||||
* 'processMessageCallback': Errors thrown from the user's `processMessage` callback passed to `subscribe`.
|
||||
* 'receive': Errors thrown when receiving messages.
|
||||
* 'renewLock': Errors thrown when automatic lock renewal fails.
|
||||
*/
|
||||
errorSource: "abandon" | "complete" | "processMessageCallback" | "receive" | "renewLock";
|
||||
/**
|
||||
* The entity path for the current receiver.
|
||||
*/
|
||||
entityPath: string;
|
||||
/**
|
||||
* The fully qualified namespace for the Service Bus.
|
||||
*/
|
||||
fullyQualifiedNamespace: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* The general message handler interface (used for streamMessages).
|
||||
|
@ -16,9 +45,10 @@ export interface MessageHandlers<ReceivedMessageT> {
|
|||
processMessage(message: ReceivedMessageT): Promise<void>;
|
||||
/**
|
||||
* Handler that processes errors that occur during receiving.
|
||||
* @param err An error from Service Bus.
|
||||
* @param args The error and additional context to indicate where
|
||||
* the error originated.
|
||||
*/
|
||||
processError(err: Error): Promise<void>;
|
||||
processError(args: ProcessErrorArgs): Promise<void>;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -10,7 +10,7 @@ import {
|
|||
InternalMessageHandlers
|
||||
} from "../models";
|
||||
import { OperationOptionsBase, trace } from "../modelsToBeSharedWithEventHubs";
|
||||
import { ServiceBusReceivedMessage } from "..";
|
||||
import { ServiceBusReceivedMessage } from "../serviceBusMessage";
|
||||
import { ConnectionContext } from "../connectionContext";
|
||||
import {
|
||||
getAlreadyReceivingErrorMsg,
|
||||
|
@ -262,20 +262,30 @@ export class ServiceBusReceiverImpl<
|
|||
try {
|
||||
await onInitialize();
|
||||
} catch (err) {
|
||||
onError(err);
|
||||
onError({
|
||||
error: err,
|
||||
errorSource: "receive",
|
||||
entityPath: this.entityPath,
|
||||
fullyQualifiedNamespace: this._context.config.host
|
||||
});
|
||||
}
|
||||
|
||||
if (!this.isClosed) {
|
||||
sReceiver.subscribe(async (message) => {
|
||||
await onMessage(message);
|
||||
}, onError);
|
||||
sReceiver.subscribe(onMessage, onError);
|
||||
} else {
|
||||
await sReceiver.close();
|
||||
}
|
||||
return;
|
||||
})
|
||||
.catch((err) => {
|
||||
onError(err);
|
||||
// TODO: being a bit broad here but the only errors that should filter out this
|
||||
// far are going to be bootstrapping the subscription.
|
||||
onError({
|
||||
error: err,
|
||||
errorSource: "receive",
|
||||
entityPath: this.entityPath,
|
||||
fullyQualifiedNamespace: this._context.config.host
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -472,7 +472,12 @@ export class ServiceBusSessionReceiverImpl<
|
|||
try {
|
||||
this._messageSession.subscribe(onMessage, onError, options);
|
||||
} catch (err) {
|
||||
onError(err);
|
||||
onError({
|
||||
error: err,
|
||||
errorSource: "receive",
|
||||
entityPath: this.entityPath,
|
||||
fullyQualifiedNamespace: this._context.config.host
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT license.
|
||||
|
||||
import { MessageHandlers } from "../models";
|
||||
import { MessageHandlers, ProcessErrorArgs } from "../models";
|
||||
import { ServiceBusReceiver } from "./receiver";
|
||||
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
|
||||
import { receiverLogger, ServiceBusLogger } from "../log";
|
||||
|
@ -49,9 +49,9 @@ export function wrapProcessErrorHandler(
|
|||
handlers: Pick<MessageHandlers<unknown>, "processError">,
|
||||
logger: ServiceBusLogger = receiverLogger
|
||||
): MessageHandlers<unknown>["processError"] {
|
||||
return async (err: Error) => {
|
||||
return async (args: ProcessErrorArgs) => {
|
||||
try {
|
||||
await handlers.processError(err);
|
||||
await handlers.processError(args);
|
||||
} catch (err) {
|
||||
logger.logError(err, `An error was thrown from the user's processError handler`);
|
||||
}
|
||||
|
|
|
@ -5,7 +5,6 @@ import { Constants, ErrorNameConditionMapper, MessagingError, translate } from "
|
|||
import {
|
||||
AmqpError,
|
||||
EventContext,
|
||||
isAmqpError,
|
||||
OnAmqpEvent,
|
||||
Receiver,
|
||||
ReceiverEvents,
|
||||
|
@ -27,7 +26,7 @@ import { BatchingReceiverLite, MinimalReceiver } from "../core/batchingReceiver"
|
|||
import { onMessageSettled, DeferredPromiseAndTimer } from "../core/shared";
|
||||
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
|
||||
import { ReceiverHelper } from "../core/receiverHelper";
|
||||
import { AcceptSessionOptions, ReceiveMode, SubscribeOptions } from "../models";
|
||||
import { AcceptSessionOptions, ProcessErrorArgs, ReceiveMode, SubscribeOptions } from "../models";
|
||||
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
|
||||
|
||||
/**
|
||||
|
@ -393,9 +392,9 @@ export class MessageSession extends LinkEntity<Receiver> {
|
|||
onMessageSettled(this.logPrefix, delivery, this._deliveryDispositionMap);
|
||||
};
|
||||
|
||||
this._notifyError = (error: MessagingError | Error) => {
|
||||
this._notifyError = (args: ProcessErrorArgs) => {
|
||||
if (this._onError) {
|
||||
this._onError(error);
|
||||
this._onError(args);
|
||||
logger.verbose(
|
||||
"%s Notified the user's error handler about the error received by the Receiver",
|
||||
this.logPrefix
|
||||
|
@ -411,7 +410,12 @@ export class MessageSession extends LinkEntity<Receiver> {
|
|||
sbError.message = `The session lock has expired on the session with id ${this.sessionId}.`;
|
||||
}
|
||||
logger.logError(sbError, "%s An error occurred for Receiver", this.logPrefix);
|
||||
this._notifyError(sbError);
|
||||
this._notifyError({
|
||||
error: sbError,
|
||||
errorSource: "receive",
|
||||
entityPath: this.entityPath,
|
||||
fullyQualifiedNamespace: this._context.config.host
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -427,7 +431,12 @@ export class MessageSession extends LinkEntity<Receiver> {
|
|||
this.name,
|
||||
sbError
|
||||
);
|
||||
this._notifyError(sbError);
|
||||
this._notifyError({
|
||||
error: sbError,
|
||||
errorSource: "receive",
|
||||
entityPath: this.entityPath,
|
||||
fullyQualifiedNamespace: this._context.config.host
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -616,17 +625,19 @@ export class MessageSession extends LinkEntity<Receiver> {
|
|||
try {
|
||||
await this._onMessage(bMessage);
|
||||
} catch (err) {
|
||||
// This ensures we call users' error handler when users' message handler throws.
|
||||
if (!isAmqpError(err)) {
|
||||
logger.logError(
|
||||
err,
|
||||
"%s An error occurred while running user's message handler for the message " +
|
||||
"with id '%s' on the receiver",
|
||||
this.logPrefix,
|
||||
bMessage.messageId
|
||||
);
|
||||
this._onError!(err);
|
||||
}
|
||||
logger.logError(
|
||||
err,
|
||||
"%s An error occurred while running user's message handler for the message " +
|
||||
"with id '%s' on the receiver",
|
||||
this.logPrefix,
|
||||
bMessage.messageId
|
||||
);
|
||||
this._onError!({
|
||||
error: err,
|
||||
errorSource: "processMessageCallback",
|
||||
entityPath: this.entityPath,
|
||||
fullyQualifiedNamespace: this._context.config.host
|
||||
});
|
||||
|
||||
const error = translate(err);
|
||||
// Nothing much to do if user's message handler throws. Let us try abandoning the message.
|
||||
|
@ -653,7 +664,12 @@ export class MessageSession extends LinkEntity<Receiver> {
|
|||
bMessage.messageId,
|
||||
translatedError
|
||||
);
|
||||
this._notifyError(translatedError);
|
||||
this._notifyError({
|
||||
error: translatedError,
|
||||
errorSource: "abandon",
|
||||
entityPath: this.entityPath,
|
||||
fullyQualifiedNamespace: this._context.config.host
|
||||
});
|
||||
}
|
||||
}
|
||||
return;
|
||||
|
@ -683,7 +699,12 @@ export class MessageSession extends LinkEntity<Receiver> {
|
|||
this.logPrefix,
|
||||
bMessage.messageId
|
||||
);
|
||||
this._notifyError(translatedError);
|
||||
this._notifyError({
|
||||
error: translatedError,
|
||||
errorSource: "complete",
|
||||
entityPath: this.entityPath,
|
||||
fullyQualifiedNamespace: this._context.config.host
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -697,7 +718,20 @@ export class MessageSession extends LinkEntity<Receiver> {
|
|||
`MessageSession with sessionId '${this.sessionId}' and name '${this.name}' ` +
|
||||
`has either not been created or is not open.`;
|
||||
logger.verbose("[%s] %s", this._context.connectionId, msg);
|
||||
this._notifyError(new Error(msg));
|
||||
this._notifyError({
|
||||
error: new Error(msg),
|
||||
// This is _probably_ the right error code since we require that
|
||||
// the message session is created before we even give back the receiver. So it not
|
||||
// being open at this point is either:
|
||||
//
|
||||
// 1. we didn't acquire the lock
|
||||
// 2. the connection was broken (we don't reconnect)
|
||||
//
|
||||
// If any of these becomes untrue you'll probably want to re-evaluate this classification.
|
||||
errorSource: "receive",
|
||||
entityPath: this.entityPath,
|
||||
fullyQualifiedNamespace: this._context.config.host
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
import chai from "chai";
|
||||
import Long from "long";
|
||||
import chaiAsPromised from "chai-as-promised";
|
||||
import { ServiceBusMessage, delay } from "../src";
|
||||
import { ServiceBusMessage, delay, ProcessErrorArgs } from "../src";
|
||||
import { getAlreadyReceivingErrorMsg } from "../src/util/errors";
|
||||
import { TestClientType, TestMessage } from "./utils/testUtils";
|
||||
import { ServiceBusReceiver, ServiceBusReceiverImpl } from "../src/receivers/receiver";
|
||||
|
@ -595,8 +595,8 @@ describe("Batching Receiver", () => {
|
|||
async processMessage(): Promise<void> {
|
||||
// process message here - it's basically a ServiceBusMessage minus any settlement related methods
|
||||
},
|
||||
async processError(err: Error): Promise<void> {
|
||||
unexpectedError = err;
|
||||
async processError(args: ProcessErrorArgs): Promise<void> {
|
||||
unexpectedError = args.error;
|
||||
}
|
||||
});
|
||||
} catch (err) {
|
||||
|
|
|
@ -24,7 +24,7 @@ import { isLinkLocked } from "../utils/misc";
|
|||
import { ServiceBusSessionReceiverImpl } from "../../src/receivers/sessionReceiver";
|
||||
import { ServiceBusReceiverImpl } from "../../src/receivers/receiver";
|
||||
import { MessageSession } from "../../src/session/messageSession";
|
||||
import { ReceiveMode } from "../../src";
|
||||
import { ProcessErrorArgs, ReceiveMode } from "../../src";
|
||||
|
||||
describe("AbortSignal", () => {
|
||||
const defaultOptions = {
|
||||
|
@ -354,8 +354,8 @@ describe("AbortSignal", () => {
|
|||
session.subscribe(
|
||||
{
|
||||
processMessage: async (_msg) => {},
|
||||
processError: async (err) => {
|
||||
receivedErrors.push(err);
|
||||
processError: async (args) => {
|
||||
receivedErrors.push(args.error);
|
||||
}
|
||||
},
|
||||
{
|
||||
|
@ -386,9 +386,9 @@ describe("AbortSignal", () => {
|
|||
receiver.subscribe(
|
||||
{
|
||||
processMessage: async (_msg: any) => {},
|
||||
processError: async (err: Error) => {
|
||||
processError: async (args: ProcessErrorArgs) => {
|
||||
resolve();
|
||||
receivedErrors.push(err);
|
||||
receivedErrors.push(args.error);
|
||||
}
|
||||
},
|
||||
{
|
||||
|
|
|
@ -4,7 +4,11 @@
|
|||
import chai from "chai";
|
||||
import chaiAsPromised from "chai-as-promised";
|
||||
import { MessageSession } from "../../src/session/messageSession";
|
||||
import { createConnectionContextForTests, defer } from "./unittestUtils";
|
||||
import {
|
||||
createConnectionContextForTests,
|
||||
createConnectionContextForTestsWithSessionId,
|
||||
defer
|
||||
} from "./unittestUtils";
|
||||
import sinon from "sinon";
|
||||
import { EventEmitter } from "events";
|
||||
import {
|
||||
|
@ -16,7 +20,8 @@ import {
|
|||
} from "rhea-promise";
|
||||
import { OnAmqpEventAsPromise } from "../../src/core/messageReceiver";
|
||||
import { ServiceBusMessageImpl } from "../../src/serviceBusMessage";
|
||||
import { ReceiveMode } from "../../src";
|
||||
import { ProcessErrorArgs, ReceiveMode } from "../../src";
|
||||
import { Constants } from "@azure/core-amqp";
|
||||
|
||||
chai.use(chaiAsPromised);
|
||||
const assert = chai.assert;
|
||||
|
@ -335,4 +340,71 @@ describe("Message session unit tests", () => {
|
|||
};
|
||||
}
|
||||
});
|
||||
|
||||
describe("errorSource", () => {
|
||||
it("errors thrown from the user's callback are marked as 'processMessageCallback' errors", async () => {
|
||||
const messageSession = await MessageSession.create(
|
||||
createConnectionContextForTestsWithSessionId("session id"),
|
||||
"entity path",
|
||||
"session id",
|
||||
{
|
||||
receiveMode: "receiveAndDelete"
|
||||
}
|
||||
);
|
||||
|
||||
try {
|
||||
let errorArgs: ProcessErrorArgs | undefined;
|
||||
|
||||
let eventContext = {
|
||||
delivery: {},
|
||||
message: {
|
||||
message_annotations: {
|
||||
[Constants.enqueuedTime]: new Date()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const subscribePromise = new Promise<void>((resolve) => {
|
||||
messageSession.subscribe(
|
||||
async (_message) => {
|
||||
throw new Error("Error thrown from the user's processMessage callback");
|
||||
},
|
||||
async (args) => {
|
||||
errorArgs = args;
|
||||
resolve();
|
||||
},
|
||||
{}
|
||||
);
|
||||
});
|
||||
|
||||
messageSession["_link"]?.emit(
|
||||
ReceiverEvents.message,
|
||||
(eventContext as any) as EventContext
|
||||
);
|
||||
|
||||
// once we emit the event we need to wait for it to be processed (and then in turn
|
||||
// generate an error)
|
||||
await subscribePromise;
|
||||
|
||||
assert.exists(errorArgs, "We should have triggered processError.");
|
||||
|
||||
assert.deepEqual(
|
||||
{
|
||||
message: errorArgs!.error.message,
|
||||
errorSource: errorArgs!.errorSource,
|
||||
entityPath: errorArgs!.entityPath,
|
||||
fullyQualifiedNamespace: errorArgs!.fullyQualifiedNamespace
|
||||
},
|
||||
{
|
||||
message: "Error thrown from the user's processMessage callback",
|
||||
errorSource: "processMessageCallback",
|
||||
entityPath: "entity path",
|
||||
fullyQualifiedNamespace: "fakeHost"
|
||||
}
|
||||
);
|
||||
} finally {
|
||||
await messageSession.close();
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -15,7 +15,7 @@ import {
|
|||
createConnectionContextForTests,
|
||||
createConnectionContextForTestsWithSessionId
|
||||
} from "./unittestUtils";
|
||||
import { InternalMessageHandlers } from "../../src/models";
|
||||
import { InternalMessageHandlers, ProcessErrorArgs } from "../../src/models";
|
||||
import { createAbortSignalForTest } from "../utils/abortSignalTestUtils";
|
||||
import { AbortSignalLike } from "@azure/abort-controller";
|
||||
import { ServiceBusSessionReceiverImpl } from "../../src/receivers/sessionReceiver";
|
||||
|
@ -226,6 +226,43 @@ describe("Receiver unit tests", () => {
|
|||
await receiverImpl.close();
|
||||
});
|
||||
|
||||
it("errors thrown when initializing a connection are reported as 'receive' errors", async () => {
|
||||
const receiverImpl = new ServiceBusReceiverImpl(
|
||||
createConnectionContextForTests({
|
||||
onCreateReceiverCalled: () => {
|
||||
throw new Error("Failed to initialize!");
|
||||
}
|
||||
}),
|
||||
"fakeEntityPath",
|
||||
"peekLock",
|
||||
1
|
||||
);
|
||||
|
||||
const processErrorArgs = await new Promise<ProcessErrorArgs>((resolve) => {
|
||||
return receiverImpl.subscribe({
|
||||
processError: async (args) => {
|
||||
resolve(args);
|
||||
},
|
||||
processMessage: async (_msg) => {}
|
||||
});
|
||||
});
|
||||
|
||||
assert.deepEqual(
|
||||
{
|
||||
message: processErrorArgs.error.message,
|
||||
errorSource: processErrorArgs.errorSource,
|
||||
entityPath: processErrorArgs.entityPath,
|
||||
fullyQualifiedNamespace: processErrorArgs.fullyQualifiedNamespace
|
||||
},
|
||||
{
|
||||
message: "Failed to initialize!",
|
||||
errorSource: "receive",
|
||||
entityPath: "fakeEntityPath",
|
||||
fullyQualifiedNamespace: "fakeHost"
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
async function subscribeAndWaitForInitialize<
|
||||
T extends ServiceBusReceivedMessage | ServiceBusReceivedMessageWithLock
|
||||
>(receiver: ServiceBusReceiverImpl<T>): Promise<ReturnType<typeof receiver["subscribe"]>> {
|
||||
|
|
|
@ -5,6 +5,7 @@ import { getMessageIterator, wrapProcessErrorHandler } from "../../src/receivers
|
|||
import chai from "chai";
|
||||
import { ServiceBusReceiver } from "../../src/receivers/receiver";
|
||||
import { ServiceBusLogger } from "../../src/log";
|
||||
import { ProcessErrorArgs } from "../../src/models";
|
||||
const assert = chai.assert;
|
||||
|
||||
describe("shared", () => {
|
||||
|
@ -13,8 +14,22 @@ describe("shared", () => {
|
|||
|
||||
const wrappedProcessError = wrapProcessErrorHandler(
|
||||
{
|
||||
processError: (err) => {
|
||||
assert.equal(err.message, "Actual error that was passed in from service bus to the user");
|
||||
processError: (args: ProcessErrorArgs) => {
|
||||
assert.deepEqual(
|
||||
{
|
||||
message: args.error.message,
|
||||
fullyQualifiedNamespace: args.fullyQualifiedNamespace,
|
||||
entityPath: args.entityPath,
|
||||
errorSource: args.errorSource
|
||||
},
|
||||
{
|
||||
message: "Actual error that was passed in from service bus to the user",
|
||||
fullyQualifiedNamespace: "fully qualified namespace",
|
||||
entityPath: "entity path",
|
||||
errorSource: "renewLock"
|
||||
}
|
||||
);
|
||||
|
||||
throw new Error("Whoops!");
|
||||
}
|
||||
},
|
||||
|
@ -29,7 +44,12 @@ describe("shared", () => {
|
|||
} as ServiceBusLogger
|
||||
);
|
||||
|
||||
wrappedProcessError(new Error("Actual error that was passed in from service bus to the user"));
|
||||
wrappedProcessError({
|
||||
error: new Error("Actual error that was passed in from service bus to the user"),
|
||||
entityPath: "entity path",
|
||||
errorSource: "renewLock",
|
||||
fullyQualifiedNamespace: "fully qualified namespace"
|
||||
});
|
||||
|
||||
assert.isTrue(logErrorCalled, "log error should have been called");
|
||||
});
|
||||
|
|
|
@ -7,10 +7,12 @@ import { ServiceBusReceiverImpl } from "../../src/receivers/receiver";
|
|||
import { createConnectionContextForTests, getPromiseResolverForTest } from "./unittestUtils";
|
||||
import { ConnectionContext } from "../../src/connectionContext";
|
||||
import { ReceiveOptions } from "../../src/core/messageReceiver";
|
||||
import { OperationOptions, ReceiveMode } from "../../src";
|
||||
import { OperationOptions, ProcessErrorArgs, ReceiveMode } from "../../src";
|
||||
import { StreamingReceiver } from "../../src/core/streamingReceiver";
|
||||
import { AbortController, AbortSignalLike } from "@azure/abort-controller";
|
||||
import sinon from "sinon";
|
||||
import { EventContext } from "rhea-promise";
|
||||
import { Constants } from "@azure/core-amqp";
|
||||
|
||||
chai.use(chaiAsPromised);
|
||||
const assert = chai.assert;
|
||||
|
@ -324,8 +326,8 @@ describe("StreamingReceiver unit tests", () => {
|
|||
receiverImpl.subscribe(
|
||||
{
|
||||
processMessage: async () => {},
|
||||
processError: async (err) => {
|
||||
errors.push(err.message);
|
||||
processError: async (args) => {
|
||||
errors.push(args.error.message);
|
||||
}
|
||||
},
|
||||
{
|
||||
|
@ -364,4 +366,58 @@ describe("StreamingReceiver unit tests", () => {
|
|||
assert.isTrue(wasCalled);
|
||||
});
|
||||
});
|
||||
|
||||
describe("errorSource", () => {
|
||||
it("errors thrown from the user's callback are marked as 'processMessageCallback' errors", async () => {
|
||||
const streamingReceiver = await StreamingReceiver.create(
|
||||
createConnectionContextForTests(),
|
||||
"entity path",
|
||||
{
|
||||
lockRenewer: undefined,
|
||||
receiveMode: "receiveAndDelete"
|
||||
}
|
||||
);
|
||||
|
||||
try {
|
||||
let args: ProcessErrorArgs | undefined;
|
||||
|
||||
let eventContext = {
|
||||
delivery: {},
|
||||
message: {
|
||||
message_annotations: {
|
||||
[Constants.enqueuedTime]: new Date()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
streamingReceiver.subscribe(
|
||||
async (_message) => {
|
||||
throw new Error("Error thrown from the user's processMessage callback");
|
||||
},
|
||||
async (_args) => {
|
||||
args = _args;
|
||||
}
|
||||
);
|
||||
|
||||
await streamingReceiver["_onAmqpMessage"]((eventContext as any) as EventContext);
|
||||
|
||||
assert.deepEqual(
|
||||
{
|
||||
message: args?.error.message,
|
||||
errorSource: args?.errorSource,
|
||||
entityPath: args?.entityPath,
|
||||
fullyQualifiedNamespace: args?.fullyQualifiedNamespace
|
||||
},
|
||||
{
|
||||
message: "Error thrown from the user's processMessage callback",
|
||||
errorSource: "processMessageCallback",
|
||||
entityPath: "entity path",
|
||||
fullyQualifiedNamespace: "fakeHost"
|
||||
}
|
||||
);
|
||||
} finally {
|
||||
await streamingReceiver.close();
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -6,7 +6,12 @@ const should = chai.should();
|
|||
const expect = chai.expect;
|
||||
import chaiAsPromised from "chai-as-promised";
|
||||
chai.use(chaiAsPromised);
|
||||
import { ServiceBusReceivedMessage, ServiceBusMessage, ServiceBusReceiver } from "../src";
|
||||
import {
|
||||
ServiceBusReceivedMessage,
|
||||
ServiceBusMessage,
|
||||
ServiceBusReceiver,
|
||||
ProcessErrorArgs
|
||||
} from "../src";
|
||||
|
||||
import { TestClientType, TestMessage, checkWithTimeout } from "./utils/testUtils";
|
||||
|
||||
|
@ -130,8 +135,8 @@ describe("receive and delete", () => {
|
|||
async processMessage(message: ServiceBusReceivedMessage): Promise<void> {
|
||||
receivedMsgs.push(message);
|
||||
},
|
||||
async processError(err: Error): Promise<void> {
|
||||
errors.push(err.message);
|
||||
async processError(args: ProcessErrorArgs): Promise<void> {
|
||||
errors.push(args.error.message);
|
||||
}
|
||||
},
|
||||
{ autoComplete: autoCompleteFlag }
|
||||
|
|
|
@ -16,6 +16,7 @@ import {
|
|||
import { ServiceBusReceiver } from "../src/receivers/receiver";
|
||||
import { ServiceBusSender } from "../src/sender";
|
||||
import { ServiceBusReceivedMessageWithLock } from "../src/serviceBusMessage";
|
||||
import { ProcessErrorArgs } from "../src/models";
|
||||
|
||||
describe("Message Lock Renewal", () => {
|
||||
let serviceBusClient: ServiceBusClientForTests;
|
||||
|
@ -125,8 +126,8 @@ describe("Message Lock Renewal", () => {
|
|||
|
||||
let uncaughtErrorFromHandlers: Error | undefined;
|
||||
|
||||
async function processError(err: Error): Promise<void> {
|
||||
uncaughtErrorFromHandlers = err;
|
||||
async function processError(args: ProcessErrorArgs): Promise<void> {
|
||||
uncaughtErrorFromHandlers = args.error;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -5,7 +5,7 @@ import chai from "chai";
|
|||
const should = chai.should();
|
||||
import chaiAsPromised from "chai-as-promised";
|
||||
chai.use(chaiAsPromised);
|
||||
import { MessagingError, ServiceBusMessage, delay } from "../src";
|
||||
import { ServiceBusMessage, delay, ProcessErrorArgs } from "../src";
|
||||
import { TestClientType, TestMessage, isMessagingError } from "./utils/testUtils";
|
||||
import {
|
||||
ServiceBusClientForTests,
|
||||
|
@ -124,8 +124,8 @@ describe("Session Lock Renewal", () => {
|
|||
// const maxAutoRenewLockDurationInMs = 300*1000;
|
||||
let uncaughtErrorFromHandlers: Error | undefined;
|
||||
|
||||
async function processError(err: MessagingError | Error) {
|
||||
uncaughtErrorFromHandlers = err;
|
||||
async function processError(args: ProcessErrorArgs) {
|
||||
uncaughtErrorFromHandlers = args.error;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -334,11 +334,11 @@ describe("Session Lock Renewal", () => {
|
|||
receiver.subscribe(
|
||||
{
|
||||
processMessage,
|
||||
async processError(err: MessagingError | Error) {
|
||||
if (isMessagingError(err) && err.code === "SessionLockLostError") {
|
||||
async processError(args: ProcessErrorArgs) {
|
||||
if (isMessagingError(args.error) && args.error.code === "SessionLockLostError") {
|
||||
sessionLockLostErrorThrown = true;
|
||||
} else {
|
||||
uncaughtErrorFromHandlers = err;
|
||||
uncaughtErrorFromHandlers = args.error;
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
|
@ -6,7 +6,12 @@ import chai from "chai";
|
|||
import chaiAsPromised from "chai-as-promised";
|
||||
import * as dotenv from "dotenv";
|
||||
import Long from "long";
|
||||
import { MessagingError, ServiceBusClient, ServiceBusSessionReceiver } from "../src";
|
||||
import {
|
||||
MessagingError,
|
||||
ProcessErrorArgs,
|
||||
ServiceBusClient,
|
||||
ServiceBusSessionReceiver
|
||||
} from "../src";
|
||||
import { ServiceBusSender } from "../src/sender";
|
||||
import { DispositionType, ServiceBusReceivedMessageWithLock } from "../src/serviceBusMessage";
|
||||
import { getReceiverClosedErrorMsg, getSenderClosedErrorMsg } from "../src/util/errors";
|
||||
|
@ -144,8 +149,20 @@ describe("Errors with non existing Namespace", function(): void {
|
|||
async processMessage() {
|
||||
throw "processMessage should not have been called when receive call is made from a non existing namespace";
|
||||
},
|
||||
async processError(err) {
|
||||
testError(err);
|
||||
async processError(args) {
|
||||
const actual: Omit<ProcessErrorArgs, "error"> = {
|
||||
errorSource: args.errorSource,
|
||||
entityPath: args.entityPath,
|
||||
fullyQualifiedNamespace: args.fullyQualifiedNamespace
|
||||
};
|
||||
|
||||
actual.should.deep.equal({
|
||||
errorSource: "receive",
|
||||
entityPath: receiver.entityPath,
|
||||
fullyQualifiedNamespace: sbClient.fullyQualifiedNamespace
|
||||
} as Omit<ProcessErrorArgs, "error">);
|
||||
|
||||
testError(args.error);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -217,8 +234,20 @@ describe("Errors with non existing Queue/Topic/Subscription", async function():
|
|||
async processMessage() {
|
||||
throw "processMessage should not have been called when receive call is made from a non existing namespace";
|
||||
},
|
||||
async processError(err) {
|
||||
testError(err, "some-name");
|
||||
async processError(args) {
|
||||
const actual: Omit<ProcessErrorArgs, "error"> = {
|
||||
errorSource: args.errorSource,
|
||||
entityPath: args.entityPath,
|
||||
fullyQualifiedNamespace: args.fullyQualifiedNamespace
|
||||
};
|
||||
|
||||
actual.should.deep.equal({
|
||||
errorSource: "receive",
|
||||
entityPath: receiver.entityPath,
|
||||
fullyQualifiedNamespace: sbClient.fullyQualifiedNamespace
|
||||
} as Omit<ProcessErrorArgs, "error">);
|
||||
|
||||
testError(args.error, "some-name");
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -238,8 +267,20 @@ describe("Errors with non existing Queue/Topic/Subscription", async function():
|
|||
async processMessage() {
|
||||
throw "processMessage should not have been called when receive call is made from a non existing namespace";
|
||||
},
|
||||
async processError(err) {
|
||||
testError(err, "some-topic-name/Subscriptions/some-subscription-name");
|
||||
async processError(args) {
|
||||
const expected: Omit<ProcessErrorArgs, "error"> = {
|
||||
errorSource: args.errorSource,
|
||||
entityPath: args.entityPath,
|
||||
fullyQualifiedNamespace: args.fullyQualifiedNamespace
|
||||
};
|
||||
|
||||
expected.should.deep.equal({
|
||||
errorSource: "receive",
|
||||
entityPath: receiver.entityPath,
|
||||
fullyQualifiedNamespace: sbClient.fullyQualifiedNamespace
|
||||
} as Omit<ProcessErrorArgs, "error">);
|
||||
|
||||
testError(args.error, "some-topic-name/Subscriptions/some-subscription-name");
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ import Long from "long";
|
|||
const should = chai.should();
|
||||
import chaiAsPromised from "chai-as-promised";
|
||||
chai.use(chaiAsPromised);
|
||||
import { ServiceBusReceivedMessage, delay } from "../src";
|
||||
import { ServiceBusReceivedMessage, delay, ProcessErrorArgs } from "../src";
|
||||
|
||||
import { TestClientType, TestMessage, checkWithTimeout, isMessagingError } from "./utils/testUtils";
|
||||
import { ServiceBusSender } from "../src/sender";
|
||||
|
@ -23,10 +23,8 @@ import { AbortController } from "@azure/abort-controller";
|
|||
|
||||
let unexpectedError: Error | undefined;
|
||||
|
||||
async function processError(err: Error): Promise<void> {
|
||||
if (err) {
|
||||
unexpectedError = err;
|
||||
}
|
||||
async function processError(args: ProcessErrorArgs): Promise<void> {
|
||||
unexpectedError = args.error;
|
||||
}
|
||||
|
||||
describe("session tests", () => {
|
||||
|
|
|
@ -9,6 +9,7 @@ import { getEntityNameFromConnectionString } from "../src/constructorHelpers";
|
|||
import { ServiceBusClientForTests, createServiceBusClientForTests } from "./utils/testutils2";
|
||||
import { ServiceBusSender } from "../src/sender";
|
||||
import { ServiceBusReceivedMessageWithLock } from "../src/serviceBusMessage";
|
||||
import { ProcessErrorArgs } from "../src/models";
|
||||
chai.use(chaiAsPromised);
|
||||
const assert = chai.assert;
|
||||
|
||||
|
@ -57,8 +58,8 @@ describe("Sample scenarios for track 2", () => {
|
|||
await message.complete();
|
||||
receivedBodies.push(message.body);
|
||||
},
|
||||
async processError(err: Error): Promise<void> {
|
||||
errors.push(err.message);
|
||||
async processError(args: ProcessErrorArgs): Promise<void> {
|
||||
errors.push(args.error.message);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -129,8 +130,8 @@ describe("Sample scenarios for track 2", () => {
|
|||
async processMessage(message: ServiceBusReceivedMessage): Promise<void> {
|
||||
receivedBodies.push(message.body);
|
||||
},
|
||||
async processError(err: Error): Promise<void> {
|
||||
errors.push(err.message);
|
||||
async processError(args: ProcessErrorArgs): Promise<void> {
|
||||
errors.push(args.error.message);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -212,8 +213,8 @@ describe("Sample scenarios for track 2", () => {
|
|||
await message.complete();
|
||||
receivedBodies.push(message.body);
|
||||
},
|
||||
async processError(err: Error): Promise<void> {
|
||||
errors.push(err.message);
|
||||
async processError(args: ProcessErrorArgs): Promise<void> {
|
||||
errors.push(args.error.message);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -236,8 +237,8 @@ describe("Sample scenarios for track 2", () => {
|
|||
async processMessage(message: ServiceBusReceivedMessage): Promise<void> {
|
||||
receivedBodies.push(message.body);
|
||||
},
|
||||
async processError(err: Error): Promise<void> {
|
||||
errors.push(err.message);
|
||||
async processError(args: ProcessErrorArgs): Promise<void> {
|
||||
errors.push(args.error.message);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -349,8 +350,8 @@ describe("Sample scenarios for track 2", () => {
|
|||
async processMessage(message: ServiceBusReceivedMessage): Promise<void> {
|
||||
receivedBodies.push(message.body);
|
||||
},
|
||||
async processError(err: Error): Promise<void> {
|
||||
errors.push(err.message);
|
||||
async processError(args: ProcessErrorArgs): Promise<void> {
|
||||
errors.push(args.error.message);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -385,8 +386,8 @@ describe("Sample scenarios for track 2", () => {
|
|||
async processMessage(message: ServiceBusReceivedMessage): Promise<void> {
|
||||
receivedBodies.push(message.body);
|
||||
},
|
||||
async processError(err: Error): Promise<void> {
|
||||
errors.push(err.message);
|
||||
async processError(args: ProcessErrorArgs): Promise<void> {
|
||||
errors.push(args.error.message);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
import chai from "chai";
|
||||
import chaiAsPromised from "chai-as-promised";
|
||||
import { ServiceBusReceivedMessage, delay } from "../src";
|
||||
import { ServiceBusReceivedMessage, delay, ProcessErrorArgs } from "../src";
|
||||
import { getAlreadyReceivingErrorMsg } from "../src/util/errors";
|
||||
import { TestMessage, checkWithTimeout, TestClientType } from "./utils/testUtils";
|
||||
import { StreamingReceiver } from "../src/core/streamingReceiver";
|
||||
|
@ -35,10 +35,8 @@ let unexpectedError: Error | undefined;
|
|||
const maxDeliveryCount = 10;
|
||||
const testClientType = getRandomTestClientTypeWithNoSessions();
|
||||
|
||||
async function processError(err: Error): Promise<void> {
|
||||
if (err) {
|
||||
unexpectedError = err;
|
||||
}
|
||||
async function processError(args: ProcessErrorArgs): Promise<void> {
|
||||
unexpectedError = args.error;
|
||||
}
|
||||
|
||||
describe("Streaming Receiver Tests", () => {
|
||||
|
@ -177,8 +175,8 @@ describe("Streaming Receiver Tests", () => {
|
|||
|
||||
streamingReceiver.subscribe(
|
||||
async () => {},
|
||||
(err) => {
|
||||
actualError = err;
|
||||
(args) => {
|
||||
actualError = args.error;
|
||||
}
|
||||
);
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
import chai from "chai";
|
||||
import chaiAsPromised from "chai-as-promised";
|
||||
import { ServiceBusReceivedMessage, delay } from "../src";
|
||||
import { ServiceBusReceivedMessage, delay, ProcessErrorArgs } from "../src";
|
||||
import { getAlreadyReceivingErrorMsg } from "../src/util/errors";
|
||||
import { TestClientType, TestMessage, checkWithTimeout } from "./utils/testUtils";
|
||||
import { DispositionType, ServiceBusReceivedMessageWithLock } from "../src/serviceBusMessage";
|
||||
|
@ -44,10 +44,8 @@ describe("Streaming with sessions", () => {
|
|||
await serviceBusClient.test.after();
|
||||
});
|
||||
|
||||
async function processError(err: Error): Promise<void> {
|
||||
if (err) {
|
||||
unexpectedError = err;
|
||||
}
|
||||
async function processError(args: ProcessErrorArgs): Promise<void> {
|
||||
unexpectedError = args.error;
|
||||
}
|
||||
|
||||
async function afterEachTest(): Promise<void> {
|
||||
|
|
Загрузка…
Ссылка в новой задаче