diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index ed9b52313cd..fa777532d9a 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -5,6 +5,9 @@ - Re-exports `RetryMode` for use when setting the `RetryOptions.mode` field in `ServiceBusClientOptions`. Resolves [#13166](https://github.com/Azure/azure-sdk-for-js/issues/13166). +- When receiving messages from sessions using either the `ServiceBusSessionReceiver.receiveMessages` method or the `ServiceBusSessionReceiver.subscribe` method, errors on the AMQP link or session were being handled well, but an error on the AMQP connection like a network disconnect was not being handled at all. This results in the promise returned by the `receiveMessages` method never getting fulfilled and the `subscribe` method not calling the user provided error handler. + This is now fixed in [#13956](https://github.com/Azure/azure-sdk-for-js/pull/13956) to throw `SessionLockLostError`. If using the `receiveMessages` method in `receiveAndDelete` mode, then the messages collected so far are returned to avoid data loss. + - Allow null as a value for the properties in `ServiceBusMessage.applicationProperties`. Fixes [#14329](https://github.com/Azure/azure-sdk-for-js/issues/14329) diff --git a/sdk/servicebus/service-bus/src/connectionContext.ts b/sdk/servicebus/service-bus/src/connectionContext.ts index 1d0e51f72f3..4f259b67868 100644 --- a/sdk/servicebus/service-bus/src/connectionContext.ts +++ b/sdk/servicebus/service-bus/src/connectionContext.ts @@ -25,7 +25,8 @@ import { ManagementClient } from "./core/managementClient"; import { formatUserAgentPrefix } from "./util/utils"; import { getRuntimeInfo } from "./util/runtimeInfo"; import { SharedKeyCredential } from "./servicebusSharedKeyCredential"; -import { ReceiverType } from "./core/linkEntity"; +import { NonSessionReceiverType, ReceiverType } from "./core/linkEntity"; +import { ServiceBusError } from "./serviceBusError"; /** * @internal @@ -134,15 +135,16 @@ type ConnectionContextMethods = Omit< /** * @internal - * Helper method to call onDetached on the receivers from the connection context upon seeing an error. + * Helper method to call onDetached on the non-sessions batching and streaming receivers from the connection context upon seeing an error. */ async function callOnDetachedOnReceivers( connectionContext: ConnectionContext, contextOrConnectionError: Error | ConnectionError | AmqpError | undefined, - receiverType: ReceiverType + receiverType: NonSessionReceiverType ): Promise { const detachCalls: Promise[] = []; + // Iterating over non-sessions batching and streaming receivers for (const receiverName of Object.keys(connectionContext.messageReceivers)) { const receiver = connectionContext.messageReceivers[receiverName]; if (receiver && receiver.receiverType === receiverType) { @@ -165,6 +167,54 @@ async function callOnDetachedOnReceivers( ); } } + return Promise.all(detachCalls); +} + +/** + * @internal + * Helper method to call onDetached on the session receivers from the connection context upon seeing an error. + */ +async function callOnDetachedOnSessionReceivers( + connectionContext: ConnectionContext, + contextOrConnectionError: Error | ConnectionError | AmqpError | undefined +): Promise { + const getSessionError = (sessionId: string, entityPath: string) => { + const sessionInfo = + `The receiver for session "${sessionId}" in "${entityPath}" has been closed and can no longer be used. ` + + `Please create a new receiver using the "acceptSession" or "acceptNextSession" method on the ServiceBusClient.`; + + const errorMessage = + contextOrConnectionError == null + ? `Unknown error occurred on the AMQP connection while receiving messages. ` + sessionInfo + : `Error occurred on the AMQP connection while receiving messages. ` + + sessionInfo + + `\nMore info - \n${contextOrConnectionError}`; + + const error = new ServiceBusError(errorMessage, "SessionLockLost"); + error.retryable = false; + return error; + }; + + const detachCalls: Promise[] = []; + + for (const receiverName of Object.keys(connectionContext.messageSessions)) { + const receiver = connectionContext.messageSessions[receiverName]; + logger.verbose( + "[%s] calling detached on %s receiver(sessions).", + connectionContext.connection.id, + receiver.name + ); + detachCalls.push( + receiver.onDetached(getSessionError(receiver.sessionId, receiver.entityPath)).catch((err) => { + logger.logError( + err, + "[%s] An error occurred while calling onDetached() on the session receiver(sessions) '%s'", + connectionContext.connection.id, + receiver.name + ); + }) + ); + } return Promise.all(detachCalls); } @@ -375,63 +425,76 @@ export namespace ConnectionContext { await connectionContext.managementClients[entityPath].close(); } - // Calling onDetached on sender - if (!state.wasConnectionCloseCalled && state.numSenders) { - // We don't do recovery for the sender: - // Because we don't want to keep the sender active all the time - // and the "next" send call would bear the burden of creating the link. - // Call onDetached() on sender so that it can gracefully shutdown - // by cleaning up the timers and closing the links. - // We don't call onDetached for sender after `refreshConnection()` - // because any new send calls that potentially initialize links would also get affected if called later. - logger.verbose( - `[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numSenders} ` + - `senders. We should not reconnect.` - ); - const detachCalls: Promise[] = []; - for (const senderName of Object.keys(connectionContext.senders)) { - const sender = connectionContext.senders[senderName]; - if (sender) { - logger.verbose( - "[%s] calling detached on sender '%s'.", - connectionContext.connection.id, - sender.name - ); - detachCalls.push( - sender.onDetached().catch((err) => { - logger.logError( - err, - "[%s] An error occurred while calling onDetached() the sender '%s'", - connectionContext.connection.id, - sender.name - ); - }) - ); + if (state.wasConnectionCloseCalled) { + // Do Nothing + } else { + // Calling onDetached on sender + if (state.numSenders) { + // We don't do recovery for the sender: + // Because we don't want to keep the sender active all the time + // and the "next" send call would bear the burden of creating the link. + // Call onDetached() on sender so that it can gracefully shutdown + // by cleaning up the timers and closing the links. + // We don't call onDetached for sender after `refreshConnection()` + // because any new send calls that potentially initialize links would also get affected if called later. + logger.verbose( + `[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numSenders} ` + + `senders. We should not reconnect.` + ); + const detachCalls: Promise[] = []; + for (const senderName of Object.keys(connectionContext.senders)) { + const sender = connectionContext.senders[senderName]; + if (sender) { + logger.verbose( + "[%s] calling detached on sender '%s'.", + connectionContext.connection.id, + sender.name + ); + detachCalls.push( + sender.onDetached().catch((err) => { + logger.logError( + err, + "[%s] An error occurred while calling onDetached() the sender '%s'", + connectionContext.connection.id, + sender.name + ); + }) + ); + } } + await Promise.all(detachCalls); + } + + // Calling onDetached on batching receivers for the same reasons as sender + const numBatchingReceivers = getNumberOfReceivers(connectionContext, "batching"); + if (numBatchingReceivers) { + logger.verbose( + `[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numBatchingReceivers} ` + + `batching receivers. We should not reconnect.` + ); + + // Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation + await callOnDetachedOnReceivers( + connectionContext, + connectionError || contextError, + "batching" + ); + } + + // Calling onDetached on session receivers + const numSessionReceivers = getNumberOfReceivers(connectionContext, "session"); + if (numSessionReceivers) { + logger.verbose( + `[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numSessionReceivers} ` + + `session receivers. We should close them.` + ); + + await callOnDetachedOnSessionReceivers( + connectionContext, + connectionError || contextError + ); } - await Promise.all(detachCalls); } - - // Calling onDetached on batching receivers for the same reasons as sender - const numBatchingReceivers = getNumberOfReceivers(connectionContext, "batching"); - if (!state.wasConnectionCloseCalled && numBatchingReceivers) { - logger.verbose( - `[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numBatchingReceivers} ` + - `batching receivers. We should reconnect.` - ); - - // Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation - await callOnDetachedOnReceivers( - connectionContext, - connectionError || contextError, - "batching" - ); - - // TODO: - // `callOnDetachedOnReceivers` handles "connectionContext.messageReceivers". - // ...What to do for sessions (connectionContext.messageSessions) ?? - } - await refreshConnection(); waitForConnectionRefreshResolve(); waitForConnectionRefreshPromise = undefined; diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index cf4682de4b7..0e9ade3842e 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -112,7 +112,6 @@ export class BatchingReceiver extends MessageReceiver { options: OperationOptionsBase ): Promise { throwErrorIfConnectionClosed(this._context); - try { logger.verbose( "[%s] Receiver '%s', setting max concurrent calls to 0.", diff --git a/sdk/servicebus/service-bus/src/core/linkEntity.ts b/sdk/servicebus/service-bus/src/core/linkEntity.ts index e41b7d1baa3..6e4990cdd4a 100644 --- a/sdk/servicebus/service-bus/src/core/linkEntity.ts +++ b/sdk/servicebus/service-bus/src/core/linkEntity.ts @@ -48,10 +48,14 @@ export interface RequestResponseLinkOptions { /** * @internal */ -export type ReceiverType = +export type NonSessionReceiverType = | "batching" // batching receiver - | "streaming" // streaming receiver; - | "session"; // message session + | "streaming"; // streaming receiver + +/** + * @internal + */ +export type ReceiverType = NonSessionReceiverType | "session"; // message session /** * @internal diff --git a/sdk/servicebus/service-bus/src/serviceBusError.ts b/sdk/servicebus/service-bus/src/serviceBusError.ts index 5828d168f1c..1eb70a8f455 100644 --- a/sdk/servicebus/service-bus/src/serviceBusError.ts +++ b/sdk/servicebus/service-bus/src/serviceBusError.ts @@ -156,7 +156,7 @@ export class ServiceBusError extends MessagingError { /** * Translates an error into either an Error or a ServiceBusError which provides a `reason` code that - * can be used by clients to programatically react to errors. + * can be used by clients to programmatically react to errors. * * If you are calling `@azure/core-amqp/translate` you should swap to using this function instead since it provides * Service Bus specific handling of the error (falling back to default translate behavior otherwise). diff --git a/sdk/servicebus/service-bus/src/session/messageSession.ts b/sdk/servicebus/service-bus/src/session/messageSession.ts index 4ddbef70709..1c44ed98bd1 100644 --- a/sdk/servicebus/service-bus/src/session/messageSession.ts +++ b/sdk/servicebus/service-bus/src/session/messageSession.ts @@ -534,7 +534,7 @@ export class MessageSession extends LinkEntity { /** * Closes the underlying AMQP receiver link. */ - async close(): Promise { + async close(error?: Error | AmqpError): Promise { try { this._isReceivingMessagesForSubscriber = false; if (this._sessionLockRenewalTimer) clearTimeout(this._sessionLockRenewalTimer); @@ -546,7 +546,7 @@ export class MessageSession extends LinkEntity { await super.close(); - await this._batchingReceiverLite.terminate(); + this._batchingReceiverLite.terminate(error); } catch (err) { logger.logError( err, @@ -762,6 +762,36 @@ export class MessageSession extends LinkEntity { } } + /** + * To be called when connection is disconnected to gracefully close ongoing receive request. + * @param connectionError - The connection error if any. + */ + async onDetached(connectionError: AmqpError | Error): Promise { + logger.error( + translateServiceBusError(connectionError), + `${this.logPrefix} onDetached: closing link (session receiver will not reconnect)` + ); + try { + // Notifying so that the streaming receiver knows about the error + this._notifyError({ + entityPath: this.entityPath, + fullyQualifiedNamespace: this._context.config.host, + error: translateServiceBusError(connectionError), + errorSource: "receive" + }); + } catch (error) { + logger.error( + translateServiceBusError(error), + `${ + this.logPrefix + } onDetached: unexpected error seen when tried calling "_notifyError" with ${translateServiceBusError( + connectionError + )}` + ); + } + await this.close(connectionError); + } + /** * Settles the message with the specified disposition. * @param message - The ServiceBus Message that needs to be settled. diff --git a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts index 2d5b71d0b22..11bad23dc59 100644 --- a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts @@ -21,17 +21,19 @@ import { ServiceBusClientForTests, createServiceBusClientForTests, testPeekMsgsLength, - getRandomTestClientTypeWithSessions, getRandomTestClientTypeWithNoSessions, EntityName, - getRandomTestClientType + getRandomTestClientType, + getRandomTestClientTypeWithSessions } from "../public/utils/testutils2"; import { AbortController } from "@azure/abort-controller"; -import { ReceiverEvents } from "rhea-promise"; +import { Receiver, ReceiverEvents } from "rhea-promise"; import { ServiceBusSessionReceiver, ServiceBusSessionReceiverImpl } from "../../src/receivers/sessionReceiver"; +import { ConnectionContext } from "../../src/connectionContext"; +import { LinkEntity } from "../../src/core/linkEntity"; const should = chai.should(); chai.use(chaiAsPromised); @@ -640,12 +642,12 @@ describe("Batching Receiver", () => { { body: "hello1", messageId: `test message ${Math.random()}`, - partitionKey: "dummy" // partitionKey is only for partitioned queue/subscrption, Unpartitioned queue/subscrption do not care about partitionKey. + partitionKey: "dummy" // partitionKey is only for partitioned queue/subscription, Unpartitioned queue/subscription do not care about partitionKey. }, { body: "hello2", messageId: `test message ${Math.random()}`, - partitionKey: "dummy" // partitionKey is only for partitioned queue/subscrption, Unpartitioned queue/subscrption do not care about partitionKey. + partitionKey: "dummy" // partitionKey is only for partitioned queue/subscription, Unpartitioned queue/subscription do not care about partitionKey. } ]; const messageWithSessions: ServiceBusMessage[] = [ @@ -661,7 +663,7 @@ describe("Batching Receiver", () => { } ]; - // We test for mutilple receiveMessages specifically to ensure that batchingRecevier on a client is reused + // We test for multiple receiveMessages specifically to ensure that batchingReceiver on a client is reused // See https://github.com/Azure/azure-service-bus-node/issues/31 async function testSequentialReceiveBatchCalls(): Promise { const testMessages = entityNames.usesSessions ? messageWithSessions : messages; @@ -883,84 +885,18 @@ describe("Batching Receiver", () => { ); }); - describe(noSessionTestClientType + ": Batch Receiver - disconnects", function(): void { - before(() => { - serviceBusClient = createServiceBusClientForTests(); - }); - - after(() => { - return serviceBusClient.test.after(); - }); - - afterEach(async () => { - await afterEachTest(); - }); - - it("can receive and settle messages after a disconnect", async function(): Promise { - // Create the sender and receiver. - await beforeEachTest(noSessionTestClientType); - - // Send a message so we can be sure when the receiver is open and active. - await sender.sendMessages(TestMessage.getSample()); - - let settledMessageCount = 0; - - const messages1 = await receiver.receiveMessages(1); - for (const message of messages1) { - await receiver.completeMessage(message); - settledMessageCount++; - } - - settledMessageCount.should.equal(1, "Unexpected number of settled messages."); - - const connectionContext = (receiver as any)["_context"]; - const refreshConnection = connectionContext.refreshConnection; - let refreshConnectionCalled = 0; - connectionContext.refreshConnection = function(...args: any) { - refreshConnectionCalled++; - refreshConnection.apply(this, args); - }; - - // Simulate a disconnect being called with a non-retryable error. - (receiver as ServiceBusReceiverImpl)["_context"].connection["_connection"].idle(); - - // send a second message to trigger the message handler again. - await sender.sendMessages(TestMessage.getSample()); - - // wait for the 2nd message to be received. - const messages2 = await (receiver as ServiceBusReceiver).receiveMessages(1); - for (const message of messages2) { - await receiver.completeMessage(message); - settledMessageCount++; - } - settledMessageCount.should.equal(2, "Unexpected number of settled messages."); - refreshConnectionCalled.should.be.greaterThan(0, "refreshConnection was not called."); - }); - - it("returns messages if drain is in progress (receiveAndDelete)", async function(): Promise< - void - > { - // Create the sender and receiver. - await beforeEachTest(noSessionTestClientType, "receiveAndDelete"); - - // The first time `receiveMessages` is called the receiver link is created. - // The `receiver_drained` handler is only added after the link is created, - // which is a non-blocking task. - await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); - const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; - const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; - + describe("Batch Receiver - disconnects", () => { + function simulateDisconnectDuringDrain( + receiverContext: ConnectionContext, + batchingReceiver: LinkEntity | undefined, + didRequestDrainResolver: Function + ) { if (!batchingReceiver || !batchingReceiver.isOpen()) { - throw new Error(`Unable to initialize receiver link.`); + throw new Error(`batchingReceiver is not open or passed undefined.`); } - - // Send a message so we have something to receive. - await sender.sendMessages(TestMessage.getSample()); - // We want to simulate a disconnect once the batching receiver is draining. // We can detect when the receiver enters a draining state when `addCredit` is - // called while `drain` is set to true. - let didRequestDrain = false; + // called while didRequestDrainResolver is called to resolve the promise. const addCredit = batchingReceiver["link"]!.addCredit; batchingReceiver["link"]!.addCredit = function(credits) { // This makes sure the receiveMessages doesn't end because of draining before the disconnect is triggered @@ -968,255 +904,507 @@ describe("Batching Receiver", () => { batchingReceiver["link"]!.removeAllListeners(ReceiverEvents.receiverDrained); addCredit.call(this, credits); if (batchingReceiver["link"]!.drain) { - didRequestDrain = true; + didRequestDrainResolver(); // Simulate a disconnect being called with a non-retryable error. receiverContext.connection["_connection"].idle(); } }; + } - // Purposefully request more messages than what's available - // so that the receiver will have to drain. - const messages1 = await receiver.receiveMessages(10, { maxWaitTimeInMs: 1000 }); + describe(noSessionTestClientType + ": Batch Receiver - disconnects", function(): void { + before(() => { + serviceBusClient = createServiceBusClientForTests(); + }); - didRequestDrain.should.equal(true, "Drain was not requested."); - messages1.length.should.equal(1, "Unexpected number of messages received."); + after(() => { + return serviceBusClient.test.after(); + }); - // Make sure that a 2nd receiveMessages call still works - // by sending and receiving a single message again. - await sender.sendMessages(TestMessage.getSample()); + afterEach(async () => { + await afterEachTest(); + }); - // wait for the 2nd message to be received. - const messages2 = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + it("can receive and settle messages after a disconnect", async function(): Promise { + // Create the sender and receiver. + await beforeEachTest(noSessionTestClientType); - messages2.length.should.equal(1, "Unexpected number of messages received."); - }); + // Send a message so we can be sure when the receiver is open and active. + await sender.sendMessages(TestMessage.getSample()); - it("throws an error if drain is in progress (peekLock)", async function(): Promise { - // Create the sender and receiver. - await beforeEachTest(noSessionTestClientType); + let settledMessageCount = 0; - // The first time `receiveMessages` is called the receiver link is created. - // The `receiver_drained` handler is only added after the link is created, - // which is a non-blocking task. - await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); - const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; - const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; - - if (!batchingReceiver || !batchingReceiver.isOpen()) { - throw new Error(`Unable to initialize receiver link.`); - } - - // Send a message so we have something to receive. - await sender.sendMessages(TestMessage.getSample()); - - // Since the receiver has already been initialized, - // the `receiver_drained` handler is attached as soon - // as receiveMessages is invoked. - // We remove the `receiver_drained` timeout after `receiveMessages` - // does it's initial setup by wrapping it in a `setTimeout`. - // This triggers the `receiver_drained` handler removal on the next - // tick of the event loop; after the handler has been attached. - setTimeout(() => { - // remove `receiver_drained` event - batchingReceiver["link"]!.removeAllListeners(ReceiverEvents.receiverDrained); - }, 0); - - // We want to simulate a disconnect once the batching receiver is draining. - // We can detect when the receiver enters a draining state when `addCredit` is - // called while `drain` is set to true. - let didRequestDrain = false; - const addCredit = batchingReceiver["link"]!.addCredit; - batchingReceiver["link"]!.addCredit = function(credits) { - didRequestDrain = true; - addCredit.call(this, credits); - if (batchingReceiver["link"]!.drain) { - // Simulate a disconnect being called with a non-retryable error. - receiverContext.connection["_connection"].idle(); + const messages1 = await receiver.receiveMessages(1); + for (const message of messages1) { + await receiver.completeMessage(message); + settledMessageCount++; } - }; - // Purposefully request more messages than what's available - // so that the receiver will have to drain. - const testFailureMessage = "Test failure"; - try { - await receiver.receiveMessages(10, { maxWaitTimeInMs: 1000 }); - throw new Error(testFailureMessage); - } catch (err) { - err.message && err.message.should.not.equal(testFailureMessage); - } + settledMessageCount.should.equal(1, "Unexpected number of settled messages."); - didRequestDrain.should.equal(true, "Drain was not requested."); + const connectionContext = (receiver as any)["_context"]; + const refreshConnection = connectionContext.refreshConnection; + let refreshConnectionCalled = 0; + connectionContext.refreshConnection = function(...args: any) { + refreshConnectionCalled++; + refreshConnection.apply(this, args); + }; - // Make sure that a 2nd receiveMessages call still works - // by sending and receiving a single message again. - await sender.sendMessages(TestMessage.getSample()); + // Simulate a disconnect being called with a non-retryable error. + (receiver as ServiceBusReceiverImpl)["_context"].connection["_connection"].idle(); - // wait for the 2nd message to be received. - const messages = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + // send a second message to trigger the message handler again. + await sender.sendMessages(TestMessage.getSample()); - messages.length.should.equal(1, "Unexpected number of messages received."); - }); + // wait for the 2nd message to be received. + const messages2 = await receiver.receiveMessages(1); + for (const message of messages2) { + await receiver.completeMessage(message); + settledMessageCount++; + } + settledMessageCount.should.equal(2, "Unexpected number of settled messages."); + refreshConnectionCalled.should.be.greaterThan(0, "refreshConnection was not called."); + }); - it("returns messages if receive in progress (receiveAndDelete)", async function(): Promise< - void - > { - // Create the sender and receiver. - await beforeEachTest(noSessionTestClientType, "receiveAndDelete"); + it("returns messages if drain is in progress (receiveAndDelete)", async function(): Promise< + void + > { + // Create the sender and receiver. + await beforeEachTest(noSessionTestClientType, "receiveAndDelete"); - // The first time `receiveMessages` is called the receiver link is created. - // The `receiver_drained` handler is only added after the link is created, - // which is a non-blocking task. - await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); - const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; - const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; + // The first time `receiveMessages` is called the receiver link is created. + // The `receiver_drained` handler is only added after the link is created, + // which is a non-blocking task. + await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); + const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; + const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; - if (!batchingReceiver || !batchingReceiver.isOpen()) { - throw new Error(`Unable to initialize receiver link.`); - } + // Send a message so we have something to receive. + await sender.sendMessages(TestMessage.getSample()); - // Send a message so we have something to receive. - await sender.sendMessages(TestMessage.getSample()); + const didRequestDrain = new Promise((resolve) => { + simulateDisconnectDuringDrain(receiverContext, batchingReceiver, resolve); + }); - // Simulate a disconnect after a message has been received. - batchingReceiver["link"]!.once("message", function() { + // Purposefully request more messages than what's available + // so that the receiver will have to drain. + const messages1 = await receiver.receiveMessages(10, { maxWaitTimeInMs: 1000 }); + + await didRequestDrain; + messages1.length.should.equal(1, "Unexpected number of messages received."); + + // Make sure that a 2nd receiveMessages call still works + // by sending and receiving a single message again. + await sender.sendMessages(TestMessage.getSample()); + + // wait for the 2nd message to be received. + const messages2 = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + + messages2.length.should.equal(1, "Unexpected number of messages received."); + }); + + it("throws an error if drain is in progress (peekLock)", async function(): Promise { + // Create the sender and receiver. + await beforeEachTest(noSessionTestClientType); + + // The first time `receiveMessages` is called the receiver link is created. + // The `receiver_drained` handler is only added after the link is created, + // which is a non-blocking task. + await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); + const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; + const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; + + // Send a message so we have something to receive. + await sender.sendMessages(TestMessage.getSample()); + + const didRequestDrain = new Promise((resolve) => { + simulateDisconnectDuringDrain(receiverContext, batchingReceiver, resolve); + }); + + // Purposefully request more messages than what's available + // so that the receiver will have to drain. + const testFailureMessage = "Test failure"; + try { + await receiver.receiveMessages(10, { maxWaitTimeInMs: 1000 }); + throw new Error(testFailureMessage); + } catch (err) { + err.message && err.message.should.not.equal(testFailureMessage); + } + + await didRequestDrain; + + // Make sure that a 2nd receiveMessages call still works + // by sending and receiving a single message again. + await sender.sendMessages(TestMessage.getSample()); + + // wait for the 2nd message to be received. + const messages = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + + messages.length.should.equal(1, "Unexpected number of messages received."); + }); + + it("returns messages if receive in progress (receiveAndDelete)", async function(): Promise< + void + > { + // Create the sender and receiver. + await beforeEachTest(noSessionTestClientType, "receiveAndDelete"); + + // The first time `receiveMessages` is called the receiver link is created. + // The `receiver_drained` handler is only added after the link is created, + // which is a non-blocking task. + await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); + const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; + const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; + + if (!batchingReceiver || !batchingReceiver.isOpen()) { + throw new Error(`batchingReceiver is not open or undefined.`); + } + + // Send a message so we have something to receive. + await sender.sendMessages(TestMessage.getSample()); + + // Simulate a disconnect after a message has been received. + batchingReceiver["link"]!.once("message", function() { + setTimeout(() => { + // Simulate a disconnect being called with a non-retryable error. + receiverContext.connection["_connection"].idle(); + }, 0); + }); + + // Purposefully request more messages than what's available + // so that the receiver will have to drain. + const messages1 = await receiver.receiveMessages(10, { maxWaitTimeInMs: 10000 }); + + messages1.length.should.equal(1, "Unexpected number of messages received."); + + // Make sure that a 2nd receiveMessages call still works + // by sending and receiving a single message again. + await sender.sendMessages(TestMessage.getSample()); + + // wait for the 2nd message to be received. + const messages2 = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + + messages2.length.should.equal(1, "Unexpected number of messages received."); + }); + + it("throws an error if receive is in progress (peekLock)", async function(): Promise { + // Create the sender and receiver. + await beforeEachTest(noSessionTestClientType); + + // The first time `receiveMessages` is called the receiver link is created. + // The `receiver_drained` handler is only added after the link is created, + // which is a non-blocking task. + await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); + const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; + + // Simulate a disconnect setTimeout(() => { // Simulate a disconnect being called with a non-retryable error. receiverContext.connection["_connection"].idle(); }, 0); + + // Purposefully request more messages than what's available + // so that the receiver will have to drain. + const testFailureMessage = "Test failure"; + try { + const msgs = await receiver.receiveMessages(10, { maxWaitTimeInMs: 10000 }); + console.log(msgs.length); + throw new Error(testFailureMessage); + } catch (err) { + err.message && err.message.should.not.equal(testFailureMessage); + } + + // Make sure that a 2nd receiveMessages call still works + // by sending and receiving a single message again. + await sender.sendMessages(TestMessage.getSample()); + + // wait for the 2nd message to be received. + const messages = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + + messages.length.should.equal(1, "Unexpected number of messages received."); }); - - // Purposefully request more messages than what's available - // so that the receiver will have to drain. - const messages1 = await receiver.receiveMessages(10, { maxWaitTimeInMs: 10000 }); - - messages1.length.should.equal(1, "Unexpected number of messages received."); - - // Make sure that a 2nd receiveMessages call still works - // by sending and receiving a single message again. - await sender.sendMessages(TestMessage.getSample()); - - // wait for the 2nd message to be received. - const messages2 = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); - - messages2.length.should.equal(1, "Unexpected number of messages received."); }); - it("throws an error if receive is in progress (peekLock)", async function(): Promise { - // Create the sender and receiver. - await beforeEachTest(noSessionTestClientType); + describe(withSessionTestClientType + ": Batch Receiver - disconnects", function(): void { + let serviceBusClient: ServiceBusClientForTests; + let sender: ServiceBusSender; + let receiver: ServiceBusSessionReceiver; - // The first time `receiveMessages` is called the receiver link is created. - // The `receiver_drained` handler is only added after the link is created, - // which is a non-blocking task. - await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); - const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; - const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; + async function beforeEachTest( + receiveMode: "peekLock" | "receiveAndDelete" = "peekLock" + ): Promise { + serviceBusClient = createServiceBusClientForTests(); + entityNames = await serviceBusClient.test.createTestEntities(withSessionTestClientType); + if (receiveMode == "receiveAndDelete") { + receiver = (await serviceBusClient.test.createReceiveAndDeleteReceiver( + entityNames + )) as ServiceBusSessionReceiver; + } else { + receiver = (await serviceBusClient.test.createPeekLockReceiver( + entityNames + )) as ServiceBusSessionReceiver; + } - if (!batchingReceiver || !batchingReceiver.isOpen()) { - throw new Error(`Unable to initialize receiver link.`); - } - - // Simulate a disconnect - setTimeout(() => { - // Simulate a disconnect being called with a non-retryable error. - receiverContext.connection["_connection"].idle(); - }, 0); - - // Purposefully request more messages than what's available - // so that the receiver will have to drain. - const testFailureMessage = "Test failure"; - try { - const msgs = await receiver.receiveMessages(10, { maxWaitTimeInMs: 10000 }); - console.log(msgs.length); - throw new Error(testFailureMessage); - } catch (err) { - err.message && err.message.should.not.equal(testFailureMessage); - } - - // Make sure that a 2nd receiveMessages call still works - // by sending and receiving a single message again. - await sender.sendMessages(TestMessage.getSample()); - - // wait for the 2nd message to be received. - const messages = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); - - messages.length.should.equal(1, "Unexpected number of messages received."); - }); - }); - - describe(withSessionTestClientType + ": Batch Receiver - disconnects", function(): void { - let serviceBusClient: ServiceBusClientForTests; - let sender: ServiceBusSender; - let receiver: ServiceBusSessionReceiver; - - async function beforeEachTest( - receiveMode: "peekLock" | "receiveAndDelete" = "peekLock" - ): Promise { - serviceBusClient = createServiceBusClientForTests(); - const entityNames = await serviceBusClient.test.createTestEntities(withSessionTestClientType); - if (receiveMode == "receiveAndDelete") { - receiver = (await serviceBusClient.test.createReceiveAndDeleteReceiver( - entityNames - )) as ServiceBusSessionReceiver; - } else { - receiver = (await serviceBusClient.test.createPeekLockReceiver( - entityNames - )) as ServiceBusSessionReceiver; - } - - sender = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) - ); - } - - afterEach(async () => { - if (serviceBusClient) { - await serviceBusClient.test.afterEach(); - await serviceBusClient.test.after(); - } - }); - - it(`throws "session lock has expired" after a disconnect`, async function(): Promise { - // Create the sender and receiver. - await beforeEachTest(); - - // Send a message so we can be sure when the receiver is open and active. - const message = TestMessage.getSessionSample(); - await sender.sendMessages(message); - - let settledMessageCount = 0; - - const messages1 = await receiver.receiveMessages(1); - for (const message of messages1) { - await receiver.completeMessage(message); - settledMessageCount++; - } - - settledMessageCount.should.equal(1, "Unexpected number of settled messages."); - - const connectionContext = (receiver as any)["_context"]; - const refreshConnection = connectionContext.refreshConnection; - let refreshConnectionCalled = 0; - connectionContext.refreshConnection = function(...args: any) { - refreshConnectionCalled++; - refreshConnection.apply(this, args); - }; - - // Simulate a disconnect being called with a non-retryable error. - (receiver as ServiceBusSessionReceiverImpl)["_context"].connection["_connection"].idle(); - - // send a second message to trigger the message handler again. - await sender.sendMessages(TestMessage.getSessionSample()); - try { - await receiver.receiveMessages(1); - assert.fail("receiveMessages should have failed"); - } catch (error) { - should.equal( - error.message, - `The session lock has expired on the session with id ${message.sessionId}` + sender = serviceBusClient.test.addToCleanup( + serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) ); } - refreshConnectionCalled.should.be.greaterThan(0, "refreshConnection was not called."); + + afterEach(async () => { + if (serviceBusClient) { + await serviceBusClient.test.afterEach(); + await serviceBusClient.test.after(); + } + }); + + it(`throws "session lock has expired" after a disconnect`, async function(): Promise { + // Create the sender and receiver. + await beforeEachTest(); + + // Send a message so we can be sure when the receiver is open and active. + const message = TestMessage.getSessionSample(); + await sender.sendMessages(message); + + let settledMessageCount = 0; + + const messages1 = await receiver.receiveMessages(1); + for (const message of messages1) { + await receiver.completeMessage(message); + settledMessageCount++; + } + + settledMessageCount.should.equal(1, "Unexpected number of settled messages."); + + const connectionContext = (receiver as any)["_context"]; + const refreshConnection = connectionContext.refreshConnection; + let refreshConnectionCalled = 0; + connectionContext.refreshConnection = function(...args: any) { + refreshConnectionCalled++; + refreshConnection.apply(this, args); + }; + + // Simulate a disconnect being called with a non-retryable error. + (receiver as ServiceBusSessionReceiverImpl)["_context"].connection["_connection"].idle(); + + // send a second message to trigger the message handler again. + await sender.sendMessages(TestMessage.getSessionSample()); + try { + await receiver.receiveMessages(1); + assert.fail("receiveMessages should have failed"); + } catch (error) { + should.equal( + error.message, + `The session lock has expired on the session with id ${message.sessionId}` + ); + } + refreshConnectionCalled.should.be.greaterThan(0, "refreshConnection was not called."); + }); + + it("returns messages if drain is in progress (receiveAndDelete)", async function(): Promise< + void + > { + // Create the sender and receiver. + await beforeEachTest("receiveAndDelete"); + + // Send a message so we can be sure when the receiver is open and active. + await sender.sendMessages(TestMessage.getSessionSample()); + + const messages1 = await receiver.receiveMessages(1); + should.equal( + messages1.length, + 1, + "Unexpected number of received messages(before disconnect)." + ); + + const receiverContext = (receiver as ServiceBusSessionReceiverImpl)["_context"]; + const batchingReceiver = (receiver as ServiceBusSessionReceiverImpl)["_messageSession"]; + + // Send a message so we have something to receive. + await sender.sendMessages(TestMessage.getSessionSample()); + + const didRequestDrain = new Promise((resolve) => { + simulateDisconnectDuringDrain(receiverContext, batchingReceiver, resolve); + }); + + // Purposefully request more messages than what's available + // so that the receiver will have to drain. + const messages2 = await receiver.receiveMessages(10); + + await didRequestDrain; + messages2.length.should.equal( + 1, + "Unexpected number of messages received(during disconnect)." + ); + + await sender.sendMessages(TestMessage.getSessionSample()); + + try { + // New receiveMessages should fail because the session lock would be lost due to the disconnection + await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + assert.fail("Receive messages call should have failed since the lock was lost"); + } catch (error) { + should.equal( + error.message, + `The session lock has expired on the session with id ${TestMessage.sessionId}`, + "Unexpected error thrown" + ); + await delay(2000); // Adding a delay of 2 sec to make sure the flaky ness goes away + // wait for the 2nd message to be received. + receiver = (await serviceBusClient.test.createReceiveAndDeleteReceiver( + entityNames + )) as ServiceBusSessionReceiver; + const messages3 = await receiver.receiveMessages(1); + + messages3.length.should.equal( + 1, + "Unexpected number of messages received(upon reconnecting)." + ); + } + }); + + it("throws an error if drain is in progress (peekLock)", async function(): Promise { + // Create the sender and receiver. + await beforeEachTest(); + + // Send a message so we can be sure when the receiver is open and active. + await sender.sendMessages(TestMessage.getSessionSample()); + + const messages1 = await receiver.receiveMessages(1); + should.equal( + messages1.length, + 1, + "Unexpected number of received messages(before disconnect)." + ); + + const receiverContext = (receiver as ServiceBusSessionReceiverImpl)["_context"]; + const batchingReceiver = (receiver as ServiceBusSessionReceiverImpl)["_messageSession"]; + + // Send a message so we have something to receive. + await sender.sendMessages(TestMessage.getSessionSample()); + + const didRequestDrain = new Promise((resolve) => { + simulateDisconnectDuringDrain(receiverContext, batchingReceiver, resolve); + }); + + // Purposefully request more messages than what's available + // so that the receiver will have to drain. + const testFailureMessage = "Test failure"; + try { + await receiver.receiveMessages(10, { maxWaitTimeInMs: 5000 }); + throw new Error(testFailureMessage); + } catch (err) { + err.message && + err.code.should.equal("SessionLockLost") && + err.message.should.not.equal(testFailureMessage); + } + + await didRequestDrain; + }); + + it("returns messages if receive in progress (receiveAndDelete)", async function(): Promise< + void + > { + // Create the sender and receiver. + await beforeEachTest("receiveAndDelete"); + + // Send a message so we can be sure when the receiver is open and active. + await sender.sendMessages(TestMessage.getSessionSample()); + + const messages1 = await receiver.receiveMessages(1); + should.equal( + messages1.length, + 1, + "Unexpected number of received messages(before disconnect)." + ); + + const receiverContext = (receiver as ServiceBusSessionReceiverImpl)["_context"]; + const batchingReceiver = (receiver as ServiceBusSessionReceiverImpl)["_messageSession"]; + + // Send a message so we have something to receive. + await sender.sendMessages(TestMessage.getSessionSample()); + + // Simulate a disconnect after a message has been received. + batchingReceiver["link"]!.once("message", function() { + setTimeout(() => { + // Simulate a disconnect being called with a non-retryable error. + receiverContext.connection["_connection"].idle(); + }, 0); + }); + + // Purposefully request more messages than what's available + // so that the receiver will have to drain. + const messages2 = await receiver.receiveMessages(10); + + messages2.length.should.equal( + 1, + "Unexpected number of messages received(during disconnect)." + ); + + await sender.sendMessages(TestMessage.getSessionSample()); + + try { + // New receiveMessages should fail because the session lock would be lost due to the disconnection + await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + assert.fail("Receive messages call should have failed since the lock was lost"); + } catch (error) { + should.equal( + error.message, + `The session lock has expired on the session with id ${TestMessage.sessionId}`, + "Unexpected error thrown" + ); + // wait for the 2nd message to be received. + await receiver.close(); + receiver = (await serviceBusClient.test.createReceiveAndDeleteReceiver( + entityNames + )) as ServiceBusSessionReceiver; + const messages3 = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + + messages3.length.should.equal( + 1, + "Unexpected number of messages received(upon reconnecting)." + ); + } + }); + + it("throws an error if receive is in progress (peekLock)", async function(): Promise { + // Create the sender and receiver. + await beforeEachTest(); + + // Send a message so we can be sure when the receiver is open and active. + await sender.sendMessages(TestMessage.getSessionSample()); + + const messages1 = await receiver.receiveMessages(1); + should.equal( + messages1.length, + 1, + "Unexpected number of received messages(before disconnect)." + ); + + const receiverContext = (receiver as ServiceBusSessionReceiverImpl)["_context"]; + const batchingReceiver = (receiver as ServiceBusSessionReceiverImpl)["_messageSession"]; + + // Send a message so we have something to receive. + await sender.sendMessages(TestMessage.getSessionSample()); + + // Simulate a disconnect after a message has been received. + batchingReceiver["link"]!.once("message", function() { + setTimeout(() => { + // Simulate a disconnect being called with a non-retryable error. + receiverContext.connection["_connection"].idle(); + }, 0); + }); + + // Purposefully request more messages than what's available + // so that the receiver will have to drain. + const testFailureMessage = "Test failure"; + try { + await receiver.receiveMessages(10, { maxWaitTimeInMs: 5000 }); + throw new Error(testFailureMessage); + } catch (err) { + err.message && + err.code.should.equal("SessionLockLost") && + err.message.should.not.equal(testFailureMessage); + } + }); }); }); }); diff --git a/sdk/servicebus/service-bus/test/public/sessionsTests.spec.ts b/sdk/servicebus/service-bus/test/public/sessionsTests.spec.ts index e5279e0cb12..610d2fb8e43 100644 --- a/sdk/servicebus/service-bus/test/public/sessionsTests.spec.ts +++ b/sdk/servicebus/service-bus/test/public/sessionsTests.spec.ts @@ -1,12 +1,18 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import chai from "chai"; +import chai, { assert } from "chai"; import Long from "long"; const should = chai.should(); import chaiAsPromised from "chai-as-promised"; chai.use(chaiAsPromised); -import { ServiceBusReceivedMessage, delay, ProcessErrorArgs, isServiceBusError } from "../../src"; +import { + ServiceBusReceivedMessage, + delay, + ProcessErrorArgs, + isServiceBusError, + ServiceBusError +} from "../../src"; import { TestClientType, TestMessage, checkWithTimeout } from "./utils/testUtils"; import { ServiceBusSender } from "../../src"; @@ -19,6 +25,8 @@ import { getRandomTestClientTypeWithSessions } from "./utils/testutils2"; import { AbortController } from "@azure/abort-controller"; +import sinon from "sinon"; +import { ServiceBusSessionReceiverImpl } from "../../src/receivers/sessionReceiver"; let unexpectedError: Error | undefined; @@ -320,7 +328,7 @@ describe("session tests", () => { * https://github.com/Azure/azure-sdk-for-js/pull/8447#issuecomment-618510245 * If support for this is added in the future, we can stop skipping this test. */ -describe.skip("SessionReceiver - disconnects", function(): void { +describe.skip("SessionReceiver - disconnects - (if recovery is supported in future)", function(): void { let serviceBusClient: ServiceBusClientForTests; async function beforeEachTest(testClientType: TestClientType): Promise { serviceBusClient = createServiceBusClientForTests(); @@ -411,3 +419,86 @@ describe.skip("SessionReceiver - disconnects", function(): void { refreshConnectionCalled.should.be.greaterThan(0, "refreshConnection was not called."); }); }); + +describe("SessionReceiver - disconnects", function(): void { + let serviceBusClient: ServiceBusClientForTests; + async function beforeEachTest(testClientType: TestClientType): Promise { + serviceBusClient = createServiceBusClientForTests(); + return serviceBusClient.test.createTestEntities(testClientType); + } + + after(() => { + return serviceBusClient.test.after(); + }); + + it("calls processError and closes the link", async function(): Promise { + const testMessage = TestMessage.getSessionSample(); + // Create the sender and receiver. + const entityName = await beforeEachTest(TestClientType.UnpartitionedQueueWithSessions); + const receiver = await serviceBusClient.acceptSession( + entityName.queue!, + testMessage.sessionId, + { + maxAutoLockRenewalDurationInMs: 10000 // Lower this value so that test can complete in time. + } + ); + + let receiverSecondMessageResolver: Function; + let errorIsThrownResolver: Function; + const errorIsThrown = new Promise((resolve) => { + errorIsThrownResolver = resolve; + }); + const receiverSecondMessage = new Promise((resolve) => { + receiverSecondMessageResolver = resolve; + }); + + const sender = serviceBusClient.createSender(entityName.queue!); + should.equal(receiver.isClosed, false, "Receiver should not have been closed"); + const isCloseCalledSpy = sinon.spy( + (receiver as ServiceBusSessionReceiverImpl)["_messageSession"], + "close" + ); + + // Send a message so we can be sure when the receiver is open and active. + await sender.sendMessages(testMessage); + receiver.subscribe( + { + async processMessage(_message: ServiceBusReceivedMessage) { + // Simulate a disconnect being called with a non-retryable error. + (receiver as any)["_context"].connection["_connection"].idle(); + }, + async processError(err) { + errorIsThrownResolver(err); + } + }, + { autoCompleteMessages: false } + ); + + const err = await errorIsThrown; + + should.equal( + (err.error as ServiceBusError).code, + "SessionLockLost", + "error code is not SessionLockLost" + ); + assert.isTrue(isCloseCalledSpy.called, "Close should have been called on the message session"); + + // send a second message to trigger the message handler again. + await sender.sendMessages(TestMessage.getSessionSample()); + const receiver2 = await serviceBusClient.acceptSession( + entityName.queue!, + testMessage.sessionId, + { + maxAutoLockRenewalDurationInMs: 10000 // Lower this value so that test can complete in time. + } + ); + receiver2.subscribe({ + async processMessage(_message: ServiceBusReceivedMessage) { + receiverSecondMessageResolver(); + }, + async processError(_err) {} + }); + await receiverSecondMessage; + await receiver2.close(); + }); +});