[Service Bus] Fix no shadow, redundant await linter errors (#13757)

This commit is contained in:
Ramya Rao 2021-02-12 09:12:25 -08:00 коммит произвёл GitHub
Родитель 7fbc2611d6
Коммит 8b55c9f721
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
16 изменённых файлов: 241 добавлений и 273 удалений

Просмотреть файл

@ -31,14 +31,14 @@ export class BatchingReceiver extends MessageReceiver {
/**
* Instantiate a new BatchingReceiver.
*
* @param context - The client entity context.
* @param connectionContext - The client entity context.
* @param options - Options for how you'd like to connect.
*/
constructor(context: ConnectionContext, entityPath: string, options: ReceiveOptions) {
super(context, entityPath, "batching", options);
constructor(connectionContext: ConnectionContext, entityPath: string, options: ReceiveOptions) {
super(connectionContext, entityPath, "batching", options);
this._batchingReceiverLite = new BatchingReceiverLite(
context,
connectionContext,
entityPath,
async (abortSignal?: AbortSignalLike): Promise<MinimalReceiver | undefined> => {
let lastError: Error | AmqpError | undefined;

Просмотреть файл

@ -67,10 +67,10 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
private _onSessionClose: OnAmqpEvent;
private _retryOptions: RetryOptions;
constructor(context: ConnectionContext, entityPath: string, retryOptions: RetryOptions) {
super(entityPath, entityPath, context, "sender", logger, {
constructor(connectionContext: ConnectionContext, entityPath: string, retryOptions: RetryOptions) {
super(entityPath, entityPath, connectionContext, "sender", logger, {
address: entityPath,
audience: `${context.config.endpoint}${entityPath}`
audience: `${connectionContext.config.endpoint}${entityPath}`
});
this._retryOptions = retryOptions;
this._onAmqpError = (context: EventContext) => {

Просмотреть файл

@ -114,11 +114,11 @@ export class StreamingReceiver extends MessageReceiver {
/**
* Instantiate a new Streaming receiver for receiving messages with handlers.
*
* @param context - The client entity context.
* @param connectionContext - The client entity context.
* @param options - Options for how you'd like to connect.
*/
constructor(context: ConnectionContext, entityPath: string, options: ReceiveOptions) {
super(context, entityPath, "streaming", options);
constructor(connectionContext: ConnectionContext, entityPath: string, options: ReceiveOptions) {
super(connectionContext, entityPath, "streaming", options);
if (typeof options?.maxConcurrentCalls === "number" && options?.maxConcurrentCalls > 0) {
this.maxConcurrentCalls = options.maxConcurrentCalls;

Просмотреть файл

@ -476,7 +476,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
const receiveMessages = async (): Promise<ServiceBusReceivedMessage[]> => {
if (!this._batchingReceiver || !this._context.messageReceivers[this._batchingReceiver.name]) {
const options: ReceiveOptions = {
const receiveOptions: ReceiveOptions = {
maxConcurrentCalls: 0,
receiveMode: this.receiveMode,
lockRenewer: this._lockRenewer
@ -484,7 +484,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
this._batchingReceiver = this._createBatchingReceiver(
this._context,
this.entityPath,
options
receiveOptions
);
}
@ -572,7 +572,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
};
const peekOperationPromise = async (): Promise<ServiceBusReceivedMessage[]> => {
if (options.fromSequenceNumber) {
return await this._context
return this._context
.getManagementClient(this.entityPath)
.peekBySequenceNumber(
options.fromSequenceNumber,
@ -581,7 +581,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
managementRequestOptions
);
} else {
return await this._context
return this._context
.getManagementClient(this.entityPath)
.peek(maxMessageCount, managementRequestOptions);
}

Просмотреть файл

@ -329,19 +329,19 @@ function getKeyValuePairsOrUndefined(
if (Array.isArray(rawProperties)) {
for (const rawProperty of rawProperties) {
const key = rawProperty.Key;
const value = rawProperty.Value["_"];
const _value = rawProperty.Value["_"];
const encodedValueType = rawProperty.Value["$"]["i:type"].toString().substring(5);
if (
encodedValueType === TypeMapForResponseDeserialization.int ||
encodedValueType === TypeMapForResponseDeserialization.double
) {
properties[key] = Number(value);
properties[key] = Number(_value);
} else if (encodedValueType === TypeMapForResponseDeserialization.string) {
properties[key] = value;
properties[key] = _value;
} else if (encodedValueType === TypeMapForResponseDeserialization.boolean) {
properties[key] = value === "true" ? true : false;
properties[key] = _value === "true" ? true : false;
} else if (encodedValueType === TypeMapForResponseDeserialization.date) {
properties[key] = new Date(value);
properties[key] = new Date(_value);
} else {
throw new TypeError(
`Unable to parse the key-value pairs in the response - ${JSON.stringify(rawProperty)}`

Просмотреть файл

@ -352,14 +352,14 @@ export class MessageSession extends LinkEntity<Receiver> {
* to indicate we want the next unlocked non-empty session.
*/
constructor(
context: ConnectionContext,
connectionContext: ConnectionContext,
entityPath: string,
private _providedSessionId: string | undefined,
options?: MessageSessionOptions
) {
super(entityPath, entityPath, context, "session", logger, {
super(entityPath, entityPath, connectionContext, "session", logger, {
address: entityPath,
audience: `${context.config.endpoint}${entityPath}`
audience: `${connectionContext.config.endpoint}${entityPath}`
});
this._receiverHelper = new ReceiverHelper(() => ({
receiver: this.link,
@ -378,7 +378,7 @@ export class MessageSession extends LinkEntity<Receiver> {
this._isReceivingMessagesForSubscriber = false;
this._batchingReceiverLite = new BatchingReceiverLite(
context,
connectionContext,
entityPath,
async (_abortSignal?: AbortSignalLike): Promise<MinimalReceiver> => {
return this.link!;

Просмотреть файл

@ -103,10 +103,7 @@ describe("getSubscriptionRuntimeProperties", () => {
await serviceBusAtomManagementClient.deleteTopic(topicName);
});
async function receiveMessagesAndAbandon(
topicName: string,
subscriptionName: string
): Promise<void> {
async function receiveMessagesAndAbandon(subscriptionName: string): Promise<void> {
const receiver = serviceBusClient.createReceiver(topicName, subscriptionName);
const receivedMessages = await receiver.receiveMessages(10);
receivedMessages.forEach(async (msg) => {
@ -122,7 +119,7 @@ describe("getSubscriptionRuntimeProperties", () => {
};
});
await serviceBusClient.createSender(topicName).sendMessages(messages);
await receiveMessagesAndAbandon(topicName, subscriptionName1);
await receiveMessagesAndAbandon(subscriptionName1);
const activeMessageCount = (
await serviceBusAtomManagementClient.getSubscriptionRuntimeProperties(
@ -142,8 +139,8 @@ describe("getSubscriptionRuntimeProperties", () => {
};
});
await serviceBusClient.createSender(topicName).sendMessages(messages);
await receiveMessagesAndAbandon(topicName, subscriptionName1);
await receiveMessagesAndAbandon(topicName, subscriptionName2);
await receiveMessagesAndAbandon(subscriptionName1);
await receiveMessagesAndAbandon(subscriptionName2);
for await (const subscription of serviceBusAtomManagementClient.listSubscriptionsRuntimeProperties(
topicName

Просмотреть файл

@ -42,186 +42,160 @@ describe("Operation Options", () => {
}
it("getNamespaceProperties", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient.getNamespaceProperties({
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient.getNamespaceProperties({
abortSignal: AbortController.timeout(1)
})
);
});
it("createQueue", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient.createQueue(entityName1, {
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient.createQueue(entityName1, {
abortSignal: AbortController.timeout(1)
})
);
});
it("getQueue", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient.getQueue(entityName1, {
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient.getQueue(entityName1, {
abortSignal: AbortController.timeout(1)
})
);
});
it("updateQueue", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient.updateQueue({ name: entityName1 } as any, {
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient.updateQueue({ name: entityName1 } as any, {
abortSignal: AbortController.timeout(1)
})
);
});
it("deleteQueue", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient.deleteQueue(entityName1, {
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient.deleteQueue(entityName1, {
abortSignal: AbortController.timeout(1)
})
);
});
it("getQueueRuntimeProperties", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient.getQueueRuntimeProperties(entityName1, {
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient.getQueueRuntimeProperties(entityName1, {
abortSignal: AbortController.timeout(1)
})
);
});
it("getQueues", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient["getQueues"]({
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient["getQueues"]({
abortSignal: AbortController.timeout(1)
})
);
});
it("getQueuesRuntimeProperties", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient["getQueuesRuntimeProperties"]({
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient["getQueuesRuntimeProperties"]({
abortSignal: AbortController.timeout(1)
})
);
});
it("createTopic", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient.createTopic(entityName1, {
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient.createTopic(entityName1, {
abortSignal: AbortController.timeout(1)
})
);
});
it("getTopic", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient.getTopic(entityName1, {
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient.getTopic(entityName1, {
abortSignal: AbortController.timeout(1)
})
);
});
it("updateTopic", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient.updateTopic({ name: entityName1 } as any, {
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient.updateTopic({ name: entityName1 } as any, {
abortSignal: AbortController.timeout(1)
})
);
});
it("deleteTopic", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient.deleteTopic(entityName1, {
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient.deleteTopic(entityName1, {
abortSignal: AbortController.timeout(1)
})
);
});
it("getTopicRuntimeProperties", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient.getTopicRuntimeProperties(entityName1, {
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient.getTopicRuntimeProperties(entityName1, {
abortSignal: AbortController.timeout(1)
})
);
});
it("getTopics", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient["getTopics"]({
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient["getTopics"]({
abortSignal: AbortController.timeout(1)
})
);
});
it("getTopicsRuntimeProperties", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient["getTopicsRuntimeProperties"]({
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient["getTopicsRuntimeProperties"]({
abortSignal: AbortController.timeout(1)
})
);
});
it("createSubscription", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient.createSubscription(entityName1, entityName2, {
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient.createSubscription(entityName1, entityName2, {
abortSignal: AbortController.timeout(1)
})
);
});
it("getSubscription", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient.getSubscription(entityName1, entityName2, {
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient.getSubscription(entityName1, entityName2, {
abortSignal: AbortController.timeout(1)
})
);
});
it("updateSubscription", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient.updateSubscription(
{ topicName: entityName1, subscriptionName: entityName2 } as any,
{
abortSignal: AbortController.timeout(1)
}
)
await verifyAbortError(async () =>
serviceBusAtomManagementClient.updateSubscription(
{ topicName: entityName1, subscriptionName: entityName2 } as any,
{
abortSignal: AbortController.timeout(1)
}
)
);
});
it("deleteSubscription", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient.deleteSubscription(entityName1, entityName2, {
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient.deleteSubscription(entityName1, entityName2, {
abortSignal: AbortController.timeout(1)
})
);
});
it("getSubscriptionRuntimeProperties", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient.getSubscriptionRuntimeProperties(
entityName1,
entityName2,
{
abortSignal: AbortController.timeout(1)
}
)
await verifyAbortError(async () =>
serviceBusAtomManagementClient.getSubscriptionRuntimeProperties(entityName1, entityName2, {
abortSignal: AbortController.timeout(1)
})
);
});
it("getSubscriptions", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient["getSubscriptions"](entityName1, {
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient["getSubscriptions"](entityName1, {
abortSignal: AbortController.timeout(1)
})
);
});
it("getSubscriptionsRuntimeProperties", async () => {
await verifyAbortError(
async () =>
await serviceBusAtomManagementClient["getSubscriptionsRuntimeProperties"](entityName1, {
abortSignal: AbortController.timeout(1)
})
await verifyAbortError(async () =>
serviceBusAtomManagementClient["getSubscriptionsRuntimeProperties"](entityName1, {
abortSignal: AbortController.timeout(1)
})
);
});
});

Просмотреть файл

@ -7,7 +7,7 @@ const assert = chai.assert;
import chaiAsPromised from "chai-as-promised";
chai.use(chaiAsPromised);
import { delay } from "rhea-promise";
import { checkWithTimeout,TestMessage } from "../public/utils/testUtils";
import { checkWithTimeout, TestMessage } from "../public/utils/testUtils";
import {
ServiceBusClientForTests,
createServiceBusClientForTests,
@ -52,21 +52,21 @@ describe("Message Lock Renewal", () => {
it(
testClientType + ": Batch Receiver: renewLock() resets lock duration each time.",
async function(): Promise<void> {
await testBatchReceiverManualLockRenewalHappyCase(sender);
await testBatchReceiverManualLockRenewalHappyCase();
}
);
it(
testClientType + ": Batch Receiver: complete() after lock expiry with throws error",
async function(): Promise<void> {
await testBatchReceiverManualLockRenewalErrorOnLockExpiry(sender);
await testBatchReceiverManualLockRenewalErrorOnLockExpiry();
}
);
it(
testClientType + ": Streaming Receiver: renewLock() resets lock duration each time.",
async function(): Promise<void> {
await testStreamingReceiverManualLockRenewalHappyCase(sender);
await testStreamingReceiverManualLockRenewalHappyCase();
}
);
@ -101,18 +101,18 @@ describe("Message Lock Renewal", () => {
await receiver.completeMessage(unprocessedMsg);
});
const receiveMethodType: ("subscribe" | "receive" | "iterator")[] = [
const receiveMethodTypes: ("subscribe" | "receive" | "iterator")[] = [
"iterator",
"subscribe",
"receive"
];
describe(`Using configurable renew durations`, () => {
receiveMethodType.forEach((receiveMethodType) => {
receiveMethodTypes.forEach((receiveMethodType) => {
it(`${testClientType}: [${receiveMethodType}] Streaming Receiver: complete() after lock expiry with auto-renewal disabled throws error`, async function(): Promise<
void
> {
await testAutoLockRenewalConfigBehavior(sender, receiveMethodType, {
await testAutoLockRenewalConfigBehavior(receiveMethodType, {
maxAutoRenewDurationInMs: 0,
delayBeforeAttemptingToCompleteMessageInSeconds: 31,
willCompleteFail: true
@ -120,11 +120,11 @@ describe("Message Lock Renewal", () => {
});
});
receiveMethodType.forEach((receiveMethodType) => {
receiveMethodTypes.forEach((receiveMethodType) => {
it(`${testClientType}: [${receiveMethodType}] : Streaming Receiver: lock will not expire until configured time`, async function(): Promise<
void
> {
await testAutoLockRenewalConfigBehavior(sender, receiveMethodType, {
await testAutoLockRenewalConfigBehavior(receiveMethodType, {
maxAutoRenewDurationInMs: 38 * 1000,
delayBeforeAttemptingToCompleteMessageInSeconds: 35,
willCompleteFail: false
@ -132,11 +132,11 @@ describe("Message Lock Renewal", () => {
});
});
receiveMethodType.forEach((receiveMethodType) => {
receiveMethodTypes.forEach((receiveMethodType) => {
it(`${testClientType}: [${receiveMethodType}] : Streaming Receiver: lock expires sometime after configured time`, async function(): Promise<
void
> {
await testAutoLockRenewalConfigBehavior(sender, receiveMethodType, {
await testAutoLockRenewalConfigBehavior(receiveMethodType, {
maxAutoRenewDurationInMs: 35 * 1000,
delayBeforeAttemptingToCompleteMessageInSeconds: 55,
willCompleteFail: true
@ -144,11 +144,11 @@ describe("Message Lock Renewal", () => {
}).timeout(95000 + 30000);
});
receiveMethodType.forEach((receiveMethodType) => {
receiveMethodTypes.forEach((receiveMethodType) => {
it(`${testClientType}: [${receiveMethodType}] Streaming Receiver: No lock renewal when config value is less than lock duration`, async function(): Promise<
void
> {
await testAutoLockRenewalConfigBehavior(sender, receiveMethodType, {
await testAutoLockRenewalConfigBehavior(receiveMethodType, {
maxAutoRenewDurationInMs: 15 * 1000,
delayBeforeAttemptingToCompleteMessageInSeconds: 31,
willCompleteFail: true
@ -168,9 +168,7 @@ describe("Message Lock Renewal", () => {
/**
* Test renewLock() after receiving a message using Batch Receiver
*/
async function testBatchReceiverManualLockRenewalHappyCase(
sender: ServiceBusSender
): Promise<void> {
async function testBatchReceiverManualLockRenewalHappyCase(): Promise<void> {
const receiver = await serviceBusClient.test.createPeekLockReceiver(autoGeneratedEntity, {
maxAutoLockRenewalDurationInMs: 0
});
@ -219,9 +217,7 @@ describe("Message Lock Renewal", () => {
/**
* Test settling of message from Batch Receiver fails after message lock expires
*/
async function testBatchReceiverManualLockRenewalErrorOnLockExpiry(
sender: ServiceBusSender
): Promise<void> {
async function testBatchReceiverManualLockRenewalErrorOnLockExpiry(): Promise<void> {
const receiver = await serviceBusClient.test.createPeekLockReceiver(autoGeneratedEntity, {
maxAutoLockRenewalDurationInMs: 0
});
@ -256,9 +252,7 @@ describe("Message Lock Renewal", () => {
/**
* Test renewLock() after receiving a message using Streaming Receiver with autoLockRenewal disabled
*/
async function testStreamingReceiverManualLockRenewalHappyCase(
sender: ServiceBusSender
): Promise<void> {
async function testStreamingReceiverManualLockRenewalHappyCase(): Promise<void> {
const receiver = await serviceBusClient.test.createPeekLockReceiver(autoGeneratedEntity, {
maxAutoLockRenewalDurationInMs: 0
});
@ -335,7 +329,6 @@ describe("Message Lock Renewal", () => {
}
async function testAutoLockRenewalConfigBehavior(
sender: ServiceBusSender,
type: "subscribe" | "receive" | "iterator",
options: AutoLockRenewalTestOptions
): Promise<void> {
@ -394,7 +387,7 @@ describe("Message Lock Renewal", () => {
): Promise<ServiceBusReceivedMessage> {
switch (type) {
case "subscribe": {
return await new Promise((resolve, reject) => {
return new Promise((resolve, reject) => {
receiver.subscribe(
{
processMessage: async (msg) => resolve(msg),

Просмотреть файл

@ -401,7 +401,7 @@ describe("Send Batch", () => {
});
const maxSizeInBytes = 30000000;
async function testSendBatch(maxSizeInBytes?: number): Promise<void> {
async function testSendBatch(): Promise<void> {
let errorIsThrown = false;
try {
await sender.createMessageBatch({ maxSizeInBytes });
@ -423,12 +423,12 @@ describe("Send Batch", () => {
it(`${noSessionTestClientType}: SendBatch`, async function(): Promise<void> {
await beforeEachTest(noSessionTestClientType);
await testSendBatch(maxSizeInBytes);
await testSendBatch();
});
it(`${withSessionTestClientType}: SendBatch`, async function(): Promise<void> {
await beforeEachTest(withSessionTestClientType);
await testSendBatch(maxSizeInBytes);
await testSendBatch();
});
});

Просмотреть файл

@ -37,11 +37,9 @@ const noSessionTestClientType = getRandomTestClientTypeWithNoSessions();
const withSessionTestClientType = getRandomTestClientTypeWithSessions();
describe("ServiceBusClient live tests", () => {
let sbClient: ServiceBusClient;
describe("Create ServiceBusClient", function(): void {
it("hostname gets populated from the connection string", function(): void {
sbClient = new ServiceBusClient(
const sbClient = new ServiceBusClient(
"Endpoint=sb://a;SharedAccessKeyName=b;SharedAccessKey=c;EntityPath=d"
);
sbClient.should.be.an.instanceof(ServiceBusClient);

Просмотреть файл

@ -3,7 +3,13 @@
import chai from "chai";
import chaiAsPromised from "chai-as-promised";
import { ServiceBusReceivedMessage, delay, ProcessErrorArgs, ServiceBusReceiver, ServiceBusSender } from "../../src";
import {
ServiceBusReceivedMessage,
delay,
ProcessErrorArgs,
ServiceBusReceiver,
ServiceBusSender
} from "../../src";
import { getAlreadyReceivingErrorMsg, MessageAlreadySettled } from "../../src/util/errors";
import { TestMessage, checkWithTimeout, TestClientType } from "../public/utils/testUtils";
import { DispositionType, ServiceBusMessageImpl } from "../../src/serviceBusMessage";
@ -13,7 +19,7 @@ import {
createServiceBusClientForTests,
drainReceiveAndDeleteReceiver,
testPeekMsgsLength,
getRandomTestClientTypeWithNoSessions,
getRandomTestClientTypeWithNoSessions
} from "../public/utils/testutils2";
import { getDeliveryProperty } from "./utils/misc";
import { verifyMessageCount } from "../public/utils/managementUtils";
@ -65,7 +71,7 @@ describe("Streaming Receiver Tests", () => {
unexpectedError = undefined;
}
describe(testClientType + ": Streaming - Misc Tests", function (): void {
describe(testClientType + ": Streaming - Misc Tests", function(): void {
afterEach(async () => {
return serviceBusClient.test.afterEach();
});
@ -74,7 +80,7 @@ describe("Streaming Receiver Tests", () => {
await beforeEachTest();
});
it("AutoComplete removes the message", async function (): Promise<void> {
it("AutoComplete removes the message", async function(): Promise<void> {
const testMessage = TestMessage.getSample();
await sender.sendMessages(testMessage);
@ -91,7 +97,7 @@ describe("Streaming Receiver Tests", () => {
return Promise.resolve();
},
processError,
processError
});
const msgsCheck = await checkWithTimeout(
@ -113,7 +119,9 @@ describe("Streaming Receiver Tests", () => {
await testPeekMsgsLength(receiver, 0);
});
it("Disabled autoComplete, no manual complete retains the message", async function (): Promise<void> {
it("Disabled autoComplete, no manual complete retains the message", async function(): Promise<
void
> {
const testMessage = TestMessage.getSample();
await sender.sendMessages(testMessage);
@ -130,7 +138,7 @@ describe("Streaming Receiver Tests", () => {
);
return Promise.resolve();
},
processError,
processError
},
{ autoCompleteMessages: false }
);
@ -152,45 +160,45 @@ describe("Streaming Receiver Tests", () => {
TestClientType.UnpartitionedQueue
);
const sender = await serviceBusClient.test.createSender(entities);
const receiver = await serviceBusClient.test.createReceiveAndDeleteReceiver(entities);
const sender2 = await serviceBusClient.test.createSender(entities);
const receiver2 = await serviceBusClient.test.createReceiveAndDeleteReceiver(entities);
await sender.sendMessages({
body: "can stop and start a subscription message 1",
await sender2.sendMessages({
body: "can stop and start a subscription message 1"
});
const { subscription, msg } = await new Promise<{
const { subscription, msg: receivedMsg } = await new Promise<{
subscription: { close(): Promise<void> };
msg: string;
}>((resolve, reject) => {
const subscription = receiver.subscribe({
const subscription = receiver2.subscribe({
processMessage: async (msg) => {
resolve({ subscription, msg: msg.body });
},
processError: async (err) => {
reject(err);
},
}
});
});
msg.should.equal("can stop and start a subscription message 1");
receivedMsg.should.equal("can stop and start a subscription message 1");
await subscription.close();
await sender.sendMessages({
body: "can stop and start a subscription message 2",
await sender2.sendMessages({
body: "can stop and start a subscription message 2"
});
const { subscription: subscription2, msg: msg2 } = await new Promise<{
subscription: { close(): Promise<void> };
msg: string;
}>((resolve, reject) => {
const subscription = receiver.subscribe({
const subscription = receiver2.subscribe({
processMessage: async (msg) => {
resolve({ subscription, msg: msg.body });
},
processError: async (err) => {
reject(err);
},
}
});
});
@ -199,7 +207,7 @@ describe("Streaming Receiver Tests", () => {
});
});
describe(testClientType + ": Streaming - Complete message", function (): void {
describe(testClientType + ": Streaming - Complete message", function(): void {
afterEach(async () => {
return serviceBusClient.test.afterEach();
});
@ -225,7 +233,7 @@ describe("Streaming Receiver Tests", () => {
await receiver.completeMessage(msg);
receivedMsgs.push(msg);
},
processError,
processError
},
{ autoCompleteMessages: autoComplete }
);
@ -237,16 +245,16 @@ describe("Streaming Receiver Tests", () => {
should.equal(receivedMsgs.length, 1, "Unexpected number of messages");
await testPeekMsgsLength(receiver, 0);
}
it("complete() removes message", async function (): Promise<void> {
it("complete() removes message", async function(): Promise<void> {
await testComplete(false);
});
it("with autoComplete: complete() removes message", async function (): Promise<void> {
it("with autoComplete: complete() removes message", async function(): Promise<void> {
await testComplete(true);
});
});
describe(testClientType + ": Streaming - Abandon message", function (): void {
describe(testClientType + ": Streaming - Abandon message", function(): void {
afterEach(async () => {
return serviceBusClient.test.afterEach();
});
@ -255,7 +263,7 @@ describe("Streaming Receiver Tests", () => {
await beforeEachTest();
});
it("Multiple abandons until maxDeliveryCount", async function (): Promise<void> {
it("Multiple abandons until maxDeliveryCount", async function(): Promise<void> {
const testMessage = TestMessage.getSample();
await sender.sendMessages(testMessage);
@ -272,7 +280,7 @@ describe("Streaming Receiver Tests", () => {
await receiver.abandonMessage(msg);
checkDeliveryCount++;
},
processError,
processError
},
{ autoCompleteMessages: false }
);
@ -306,7 +314,7 @@ describe("Streaming Receiver Tests", () => {
});
});
describe(testClientType + ": Streaming - Defer message", function (): void {
describe(testClientType + ": Streaming - Defer message", function(): void {
afterEach(async () => {
return serviceBusClient.test.afterEach();
});
@ -326,7 +334,7 @@ describe("Streaming Receiver Tests", () => {
await receiver.deferMessage(msg);
sequenceNum = msg.sequenceNumber;
},
processError,
processError
},
{ autoCompleteMessages: autoComplete }
);
@ -359,16 +367,18 @@ describe("Streaming Receiver Tests", () => {
await testPeekMsgsLength(receiver, 0);
}
it("defer() moves message to deferred queue", async function (): Promise<void> {
it("defer() moves message to deferred queue", async function(): Promise<void> {
await testDefer(false);
});
it("with autoComplete: defer() moves message to deferred queue", async function (): Promise<void> {
it("with autoComplete: defer() moves message to deferred queue", async function(): Promise<
void
> {
await testDefer(true);
});
});
describe(testClientType + ": Streaming - Deadletter message", function (): void {
describe(testClientType + ": Streaming - Deadletter message", function(): void {
afterEach(async () => {
return serviceBusClient.test.afterEach();
});
@ -389,7 +399,7 @@ describe("Streaming Receiver Tests", () => {
await receiver.deadLetterMessage(msg);
receivedMsgs.push(msg);
},
processError,
processError
},
{ autoCompleteMessages: autoComplete }
);
@ -414,16 +424,18 @@ describe("Streaming Receiver Tests", () => {
await testPeekMsgsLength(deadLetterReceiver, 0);
}
it("deadLetter() moves message to deadletter queue", async function (): Promise<void> {
it("deadLetter() moves message to deadletter queue", async function(): Promise<void> {
await testDeadletter(false);
});
it("with autoComplete: deadLetter() moves message to deadletter queue", async function (): Promise<void> {
it("with autoComplete: deadLetter() moves message to deadletter queue", async function(): Promise<
void
> {
await testDeadletter(true);
});
});
describe(testClientType + ": Streaming - Multiple Receiver Operations", function (): void {
describe(testClientType + ": Streaming - Multiple Receiver Operations", function(): void {
afterEach(async () => {
return serviceBusClient.test.afterEach();
});
@ -436,7 +448,7 @@ describe("Streaming Receiver Tests", () => {
async processMessage(msg: ServiceBusReceivedMessage) {
await receiver.completeMessage(msg);
},
processError,
processError
});
await delay(5000);
try {
@ -444,7 +456,7 @@ describe("Streaming Receiver Tests", () => {
async processMessage() {
return Promise.resolve();
},
processError,
processError
});
} catch (err) {
errorMessage = err && err.message;
@ -469,7 +481,9 @@ describe("Streaming Receiver Tests", () => {
);
}
it("Second receive operation should fail if the first streaming receiver is not stopped", async function (): Promise<void> {
it("Second receive operation should fail if the first streaming receiver is not stopped", async function(): Promise<
void
> {
await beforeEachTest();
await testMultipleReceiveCalls();
});
@ -498,7 +512,7 @@ describe("Streaming Receiver Tests", () => {
receivedMsgs.push(msg);
return Promise.resolve();
},
processError,
processError
});
const msgsCheck = await checkWithTimeout(
@ -542,24 +556,24 @@ describe("Streaming Receiver Tests", () => {
should.equal(errorWasThrown, true, "Error thrown flag must be true");
}
it("complete() throws error", async function (): Promise<void> {
it("complete() throws error", async function(): Promise<void> {
await testSettlement(DispositionType.complete);
});
it("abandon() throws error", async function (): Promise<void> {
it("abandon() throws error", async function(): Promise<void> {
await testSettlement(DispositionType.abandon);
});
it("defer() throws error", async function (): Promise<void> {
it("defer() throws error", async function(): Promise<void> {
await testSettlement(DispositionType.defer);
});
it("deadLetter() throws error", async function (): Promise<void> {
it("deadLetter() throws error", async function(): Promise<void> {
await testSettlement(DispositionType.deadletter);
});
});
describe("Streaming - User Error", function (): void {
describe("Streaming - User Error", function(): void {
afterEach(async () => {
return serviceBusClient.test.afterEach();
});
@ -576,7 +590,7 @@ describe("Streaming Receiver Tests", () => {
receivedMsgs.push(msg);
throw new Error(errorMessage);
},
processError,
processError
});
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
@ -596,13 +610,12 @@ describe("Streaming Receiver Tests", () => {
should.equal(receivedMsgs.length, 1, "Unexpected number of messages");
}
it(
testClientType + ": onError handler is called for user error",
async function (): Promise<void> {
await beforeEachTest();
await testUserError();
}
);
it(testClientType + ": onError handler is called for user error", async function(): Promise<
void
> {
await beforeEachTest();
await testUserError();
});
});
// describe("Streaming - Failed init should not cache receiver", function(): void {
@ -678,7 +691,7 @@ describe("Streaming Receiver Tests", () => {
// });
// });
describe(testClientType + ": Streaming - maxConcurrentCalls", function (): void {
describe(testClientType + ": Streaming - maxConcurrentCalls", function(): void {
afterEach(async () => {
return serviceBusClient.test.afterEach();
});
@ -720,7 +733,7 @@ describe("Streaming Receiver Tests", () => {
await receiver.completeMessage(msg);
settledMsgs.push(msg);
},
processError,
processError
},
maxConcurrentCalls ? { maxConcurrentCalls } : {}
);
@ -730,20 +743,20 @@ describe("Streaming Receiver Tests", () => {
should.equal(settledMsgs.length, 2, `Expected 2, received ${settledMsgs.length} messages.`);
}
it("no maxConcurrentCalls passed", async function (): Promise<void> {
it("no maxConcurrentCalls passed", async function(): Promise<void> {
await testConcurrency();
});
it("pass 1 for maxConcurrentCalls", async function (): Promise<void> {
it("pass 1 for maxConcurrentCalls", async function(): Promise<void> {
await testConcurrency(1);
});
it("pass 2 for maxConcurrentCalls", async function (): Promise<void> {
it("pass 2 for maxConcurrentCalls", async function(): Promise<void> {
await testConcurrency(2);
});
});
describe("Streaming - Not receive messages after receiver is closed", function (): void {
describe("Streaming - Not receive messages after receiver is closed", function(): void {
async function testReceiveMessages(): Promise<void> {
const totalNumOfMessages = 5;
let num = 1;
@ -754,7 +767,7 @@ describe("Streaming Receiver Tests", () => {
messageId: num,
body: "test",
label: `${num}`,
partitionKey: "dummy", // Ensures all messages go to same partition to make peek work reliably
partitionKey: "dummy" // Ensures all messages go to same partition to make peek work reliably
};
num++;
messages.push(message);
@ -770,10 +783,10 @@ describe("Streaming Receiver Tests", () => {
receivedMsgs.push(brokeredMessage);
await receiver.completeMessage(brokeredMessage);
},
processError,
processError
},
{
autoCompleteMessages: false,
autoCompleteMessages: false
}
);
await receiver.close();
@ -797,7 +810,7 @@ describe("Streaming Receiver Tests", () => {
it(
testClientType + ": Not receive messages after receiver is closed",
async function (): Promise<void> {
async function(): Promise<void> {
await beforeEachTest();
await testReceiveMessages();
}
@ -805,7 +818,7 @@ describe("Streaming Receiver Tests", () => {
it(
testClientType + ": (Receive And Delete mode) Not receive messages after receiver is closed",
async function (): Promise<void> {
async function(): Promise<void> {
await beforeEachTest("receiveAndDelete");
await testReceiveMessages();
}
@ -819,9 +832,9 @@ describe("Streaming Receiver Tests", () => {
const actualReceiver = await serviceBusClient.test.createPeekLockReceiver(entities);
const receiver2 = await serviceBusClient.test.createReceiveAndDeleteReceiver(entities);
const sender = await serviceBusClient.test.createSender(entities);
const sender2 = await serviceBusClient.test.createSender(entities);
await sender.sendMessages({ body: ".close() test - first message" });
await sender2.sendMessages({ body: ".close() test - first message" });
const { subscriber, messages } = await singleMessagePromise(actualReceiver);
@ -835,8 +848,8 @@ describe("Streaming Receiver Tests", () => {
await actualReceiver.completeMessage(messages[0]);
messages.pop();
await sender.sendMessages({
body: ".close test - second message, after closing",
await sender2.sendMessages({
body: ".close test - second message, after closing"
});
// the subscription is closed so no messages should be received here.
@ -853,7 +866,7 @@ describe("Streaming Receiver Tests", () => {
);
});
describe(testClientType + ": Streaming - disconnects", function (): void {
describe(testClientType + ": Streaming - disconnects", function(): void {
let serviceBusClient: ServiceBusClientForTests;
let sender: ServiceBusSender;
let receiver: ServiceBusReceiver;
@ -878,7 +891,7 @@ describe(testClientType + ": Streaming - disconnects", function (): void {
await serviceBusClient.test.afterEach();
});
it("can receive and settle messages after a disconnect", async function (): Promise<void> {
it("can receive and settle messages after a disconnect", async function(): Promise<void> {
/**
* If onDetached is called with a non-retryable error, it is assumed that
* the onSessionError or onAmqpError has already called the user's
@ -919,7 +932,7 @@ describe(testClientType + ": Streaming - disconnects", function (): void {
}
}
},
processError: processErrorFake,
processError: processErrorFake
});
// Wait until we're sure the receiver is open and receiving messages.
@ -931,7 +944,7 @@ describe(testClientType + ": Streaming - disconnects", function (): void {
const connectionContext = (receiver as any)["_context"];
const refreshConnection = connectionContext.refreshConnection;
let refreshConnectionCalled = 0;
connectionContext.refreshConnection = function (...args: any) {
connectionContext.refreshConnection = function(...args: any) {
refreshConnectionCalled++;
refreshConnection.apply(this, args);
};
@ -1028,10 +1041,10 @@ export function singleMessagePromise(
},
processError: async (err) => {
reject(err);
},
}
},
{
autoCompleteMessages: false,
autoCompleteMessages: false
}
);
});

Просмотреть файл

@ -11,7 +11,7 @@ import {
ServiceBusSender
} from "../../src";
import { getAlreadyReceivingErrorMsg, MessageAlreadySettled } from "../../src/util/errors";
import { TestClientType, TestMessage, checkWithTimeout } from "../public/utils/testUtils";
import { TestMessage, checkWithTimeout } from "../public/utils/testUtils";
import { DispositionType } from "../../src/serviceBusMessage";
import {
ServiceBusSessionReceiver,
@ -57,7 +57,7 @@ describe("Streaming with sessions", () => {
async function beforeEachTest(
receiveMode?: "peekLock" | "receiveAndDelete"
): Promise<EntityName> {
const entityNames = await createReceiverForTests(testClientType, receiveMode);
const entityNames = await createReceiverForTests(receiveMode);
sender = serviceBusClient.test.addToCleanup(
serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!)
@ -71,7 +71,6 @@ describe("Streaming with sessions", () => {
}
async function createReceiverForTests(
testClientType: TestClientType,
receiveMode?: "peekLock" | "receiveAndDelete"
): Promise<AutoGeneratedEntity> {
const entityNames = await serviceBusClient.test.createTestEntities(testClientType);
@ -105,8 +104,8 @@ describe("Streaming with sessions", () => {
async () => {
const entities = await serviceBusClient.test.createTestEntities(testClientType);
const sender = await serviceBusClient.test.createSender(entities);
await sender.sendMessages({
const sender2 = await serviceBusClient.test.createSender(entities);
await sender2.sendMessages({
body: ".close() test - first message",
sessionId: TestMessage.sessionId
});
@ -127,7 +126,7 @@ describe("Streaming with sessions", () => {
await actualReceiver.completeMessage(messages[0]);
messages.pop();
await sender.sendMessages({
await sender2.sendMessages({
body: ".close test - second message, after closing",
sessionId: TestMessage.sessionId
});
@ -286,12 +285,9 @@ describe("Streaming with sessions", () => {
});
async function eachTest(autoComplete: boolean): Promise<void> {
await beforeEachTest();
await testAbandon(testClientType, autoComplete);
await testAbandon(autoComplete);
}
async function testAbandon(
testClientType: TestClientType,
autoComplete: boolean
): Promise<void> {
async function testAbandon(autoComplete: boolean): Promise<void> {
const testMessage = TestMessage.getSessionSample();
await sender.sendMessages(testMessage);
let abandonFlag = 0;
@ -321,7 +317,7 @@ describe("Streaming with sessions", () => {
should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);
await createReceiverForTests(testClientType);
await createReceiverForTests();
const receivedMsgs = await receiver.receiveMessages(1);
should.equal(receivedMsgs.length, 1, "Unexpected number of messages");

Просмотреть файл

@ -228,28 +228,28 @@ describe("autoLockRenewer unit tests", () => {
};
it("doesn't support receiveAndDelete mode", () => {
const autoLockRenewer = LockRenewer.create(
const autoLockRenewer2 = LockRenewer.create(
unusedMgmtClient,
1, // this is okay,
"receiveAndDelete" // this is not okay - there aren't any locks to renew in receiveAndDelete mode.
);
assert.notExists(
autoLockRenewer,
autoLockRenewer2,
"Shouldn't create an autolockRenewer in receiveAndDelete mode"
);
});
[0, -1].forEach((invalidMaxAutoRenewLockDurationInMs) => {
it(`Invalid maxAutoRenewLockDurationInMs duration: ${invalidMaxAutoRenewLockDurationInMs}`, () => {
const autoLockRenewer = LockRenewer.create(
const autoLockRenewer2 = LockRenewer.create(
unusedMgmtClient,
invalidMaxAutoRenewLockDurationInMs,
"peekLock" // this is okay
);
assert.notExists(
autoLockRenewer,
autoLockRenewer2,
"Shouldn't create an autolockRenewer when the auto lock duration is invalid"
);
});

Просмотреть файл

@ -224,10 +224,10 @@ describe("Listing methods - PagedAsyncIterableIterator", function(): void {
// In case the namespace has too many entities and the newly created entities were not recovered
if (marker) {
for await (const response of getIter().byPage({
for await (const pageResponse of getIter().byPage({
continuationToken: marker
})) {
for (const entity of response) {
for (const entity of pageResponse) {
receivedEntities.push(
methodName.includes("Subscription") ? entity.subscriptionName : entity.name
);

Просмотреть файл

@ -71,7 +71,7 @@ describe("Session Lock Renewal", () => {
testClientType + ": Batch Receiver: renewLock() resets lock duration each time",
async function(): Promise<void> {
await beforeEachTest(0);
await testBatchReceiverManualLockRenewalHappyCase(sender, receiver);
await testBatchReceiverManualLockRenewalHappyCase();
}
);
@ -131,10 +131,7 @@ describe("Session Lock Renewal", () => {
/**
* Test manual renewLock() using Batch Receiver, with autoLockRenewal disabled
*/
async function testBatchReceiverManualLockRenewalHappyCase(
sender: ServiceBusSender,
receiver: ServiceBusSessionReceiver
): Promise<void> {
async function testBatchReceiverManualLockRenewalHappyCase(): Promise<void> {
const testMessage = getTestMessage();
testMessage.body = `testBatchReceiverManualLockRenewalHappyCase-${Date.now().toString()}`;
await sender.sendMessages(testMessage);