diff --git a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts index e892351522b..60c72f14c7d 100644 --- a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts @@ -4,17 +4,8 @@ import chai from "chai"; import Long from "long"; import chaiAsPromised from "chai-as-promised"; -import { - ServiceBusMessage, - delay, - ProcessErrorArgs, - ServiceBusSender, - ServiceBusReceivedMessage -} from "../../src"; -import { - getAlreadyReceivingErrorMsg, - InvalidOperationForPeekedMessage -} from "../../src/util/errors"; +import { ServiceBusMessage, delay, ServiceBusSender, ServiceBusReceivedMessage } from "../../src"; +import { InvalidOperationForPeekedMessage } from "../../src/util/errors"; import { TestClientType, TestMessage } from "../public/utils/testUtils"; import { ServiceBusReceiver, ServiceBusReceiverImpl } from "../../src/receivers/receiver"; import { @@ -572,73 +563,6 @@ describe("Batching Receiver", () => { await afterEachTest(); }); - // We use an empty queue/topic here so that the first receiveMessages call takes time to return - async function testParallelReceiveCalls(): Promise { - const firstBatchPromise = receiver.receiveMessages(1, { maxWaitTimeInMs: 10000 }); - await delay(5000); - - let errorMessage; - const expectedErrorMessage = getAlreadyReceivingErrorMsg( - receiver.entityPath, - entityNames.usesSessions ? TestMessage.sessionId : undefined - ); - - try { - await receiver.receiveMessages(1); - } catch (err) { - errorMessage = err && err.message; - } - should.equal( - errorMessage, - expectedErrorMessage, - "Unexpected error message for receiveMessages" - ); - - let unexpectedError; - try { - receiver.subscribe({ - async processMessage(): Promise { - // process message here - it's basically a ServiceBusMessage minus any settlement related methods - }, - async processError(args: ProcessErrorArgs): Promise { - unexpectedError = args.error; - } - }); - } catch (err) { - errorMessage = err && err.message; - } - should.equal( - errorMessage, - expectedErrorMessage, - "Unexpected error message for registerMessageHandler" - ); - should.equal( - unexpectedError, - undefined, - "Unexpected error found in errorHandler for registerMessageHandler" - ); - - await firstBatchPromise; - } - - it( - noSessionTestClientType + - ": Throws error when ReceiveBatch is called while the previous call is not done", - async function(): Promise { - await beforeEachTest(noSessionTestClientType); - await testParallelReceiveCalls(); - } - ); - - it( - withSessionTestClientType + - ": Throws error when ReceiveBatch is called while the previous call is not done", - async function(): Promise { - await beforeEachTest(withSessionTestClientType); - await testParallelReceiveCalls(); - } - ); - const messages: ServiceBusMessage[] = [ { body: "hello1", diff --git a/sdk/servicebus/service-bus/test/internal/unit/serviceBusReceiverUnitTests.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/serviceBusReceiverUnitTests.spec.ts new file mode 100644 index 00000000000..d256edfa7cf --- /dev/null +++ b/sdk/servicebus/service-bus/test/internal/unit/serviceBusReceiverUnitTests.spec.ts @@ -0,0 +1,79 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { BatchingReceiver } from "../../../src/core/batchingReceiver"; +import { ServiceBusReceiverImpl } from "../../../src/receivers/receiver"; +import { assertThrows } from "../../public/utils/testUtils"; +import { createConnectionContextForTests, getPromiseResolverForTest } from "./unittestUtils"; +import chai from "chai"; +import { InternalMessageHandlers } from "../../../src/models"; +const assert = chai.assert; + +describe("ServiceBusReceiver unit tests", () => { + let receiver: ServiceBusReceiverImpl; + + beforeEach(() => { + receiver = new ServiceBusReceiverImpl( + createConnectionContextForTests(), + "entityPath", + "peekLock", + 0 + ); + }); + + afterEach(() => receiver.close()); + + const expectedError: Record = { + name: "Error", + message: 'The receiver for "entityPath" is already receiving messages.' + }; + + it("isAlreadyReceiving (batching first, then streaming)", async () => { + assert.isFalse(receiver["_isReceivingMessages"](), "Nothing should be receiving messages"); + + receiver["_batchingReceiver"] = { + isOpen: () => true, + isReceivingMessages: true, + close: async () => {} + } as BatchingReceiver; + + assert.isTrue(receiver["_isReceivingMessages"](), "Batching receiver is receiving messages"); + + const subscribeFn = async () => { + receiver.subscribe({ + processError: async (_errArgs) => {}, + processMessage: async (_msg) => {} + }); + }; + + await assertThrows( + subscribeFn, + expectedError, + "Trying to receive a separate way, in parallel, should throw" + ); + }); + + it("isAlreadyReceiving (streaming first, then batching)", async () => { + assert.isFalse(receiver["_isReceivingMessages"](), "Nothing should be receiving messages"); + + const { promise: subscriberInitializedPromise, resolve } = getPromiseResolverForTest(); + + receiver.subscribe({ + processInitialize: async () => { + resolve(); + }, + processError: async (_errArgs) => {}, + processMessage: async (_msg) => {} + } as InternalMessageHandlers); + + await subscriberInitializedPromise; + + assert.isTrue(receiver["_isReceivingMessages"](), "Streaming receiver is receiving messages"); + + await assertThrows( + () => receiver.receiveMessages(1), + expectedError, + "Trying to receive a separate way, in parallel, should throw" + ); + }); +}); diff --git a/sdk/servicebus/service-bus/test/public/utils/testUtils.ts b/sdk/servicebus/service-bus/test/public/utils/testUtils.ts index 268d7882bf2..64fcaa7bd84 100644 --- a/sdk/servicebus/service-bus/test/public/utils/testUtils.ts +++ b/sdk/servicebus/service-bus/test/public/utils/testUtils.ts @@ -210,7 +210,8 @@ export enum EntityNames { */ export async function assertThrows( fn: () => Promise, - expectedErr: Record + expectedErr: Record, + assertMessage?: string ): Promise { try { await fn(); @@ -225,5 +226,5 @@ export async function assertThrows( return err; } - throw new Error("assert failure: error was expected, but none was thrown"); + throw new Error(`assert failure, an error was expected, but none was thrown: ${assertMessage}`); }