[Service Bus] Minor updates to stress tests (#12561)

This PR adds 
- a new option `sendAllMessagesBeforeReceiveStarts` which would be useful if we want to receive a ton of messages in the queue.
- tracking more info related to the messages
- other minor fixes
This commit is contained in:
Harsha Nalluru 2021-01-20 16:04:39 -08:00 коммит произвёл GitHub
Родитель 99135cc224
Коммит 28a534f8cd
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 38 добавлений и 11 удалений

Просмотреть файл

@ -19,14 +19,19 @@ interface ScenarioReceiveBatchOptions {
numberOfMessagesPerSend?: number;
delayBetweenSendsInMs?: number;
totalNumberOfMessagesToSend?: number;
/**
* If set to true, `totalNumberOfMessagesToSend` number of messages will be sent before triggering receive.
*/
sendAllMessagesBeforeReceiveStarts?: boolean;
numberOfParallelSends?: number;
maxAutoLockRenewalDurationInMs?: number;
settleMessageOnReceive: boolean;
}
function sanitizeOptions(args: string[]): Required<ScenarioReceiveBatchOptions> {
const options = parsedArgs<ScenarioReceiveBatchOptions>(args, {
boolean: ["settleMessageOnReceive"],
default: { settleMessageOnReceive: false }
boolean: ["settleMessageOnReceive", "sendAllMessagesBeforeReceiveStarts"],
default: { settleMessageOnReceive: false, sendAllMessagesBeforeReceiveStarts: false }
});
return {
testDurationInMs: options.testDurationInMs || 60 * 60 * 1000, // Default = 60 minutes
@ -37,14 +42,16 @@ function sanitizeOptions(args: string[]): Required<ScenarioReceiveBatchOptions>
numberOfMessagesPerSend: options.numberOfMessagesPerSend || 1,
delayBetweenSendsInMs: options.delayBetweenSendsInMs || 0,
totalNumberOfMessagesToSend: options.totalNumberOfMessagesToSend || Infinity,
sendAllMessagesBeforeReceiveStarts: options.sendAllMessagesBeforeReceiveStarts,
maxAutoLockRenewalDurationInMs: options.maxAutoLockRenewalDurationInMs || 0, // 0 = disabled
settleMessageOnReceive: options.settleMessageOnReceive
settleMessageOnReceive: options.settleMessageOnReceive,
numberOfParallelSends: options.numberOfParallelSends || 5
};
}
export async function scenarioReceiveBatch() {
const testOptions = sanitizeOptions(process.argv);
const {
let {
testDurationInMs,
receiveMode,
receiveBatchMaxMessageCount,
@ -54,7 +61,9 @@ export async function scenarioReceiveBatch() {
delayBetweenSendsInMs,
totalNumberOfMessagesToSend,
maxAutoLockRenewalDurationInMs,
settleMessageOnReceive
settleMessageOnReceive,
sendAllMessagesBeforeReceiveStarts,
numberOfParallelSends
} = testOptions;
// Sending stops after 70% of total duration to give the receiver a chance to clean up and receive all the messages
@ -86,7 +95,10 @@ export async function scenarioReceiveBatch() {
elapsedTime < testDurationForSendInMs &&
stressBase.messagesSent.length < totalNumberOfMessagesToSend
) {
await stressBase.sendMessages([sender], numberOfMessagesPerSend);
await stressBase.sendMessages(
new Array(numberOfParallelSends).fill(sender),
numberOfMessagesPerSend
);
elapsedTime = new Date().valueOf() - startedAt.valueOf();
await delay(delayBetweenSendsInMs);
}
@ -106,7 +118,12 @@ export async function scenarioReceiveBatch() {
}
}
await Promise.all([sendMessages(), receiveMessages()]);
if (sendAllMessagesBeforeReceiveStarts) {
await sendMessages();
}
await Promise.all(
(!sendAllMessagesBeforeReceiveStarts ? [sendMessages()] : []).concat(receiveMessages())
);
await sbClient.close();
await stressBase.end();

Просмотреть файл

@ -25,7 +25,7 @@ function sanitizeOptions(args: string[]): Required<ScenarioCloseOptions> {
default: { shouldCreateNewClientEachTime: true }
});
return {
testDurationInMs: options.testDurationInMs || 60 * 1000, // Default = 60 minutes
testDurationInMs: options.testDurationInMs || 60 * 60 * 1000, // Default = 60 minutes
receiveBatchMaxMessageCount: options.receiveBatchMaxMessageCount || 10,
receiveBatchMaxWaitTimeInMs: options.receiveBatchMaxWaitTimeInMs || 10000,
numberOfMessagesPerSend: options.numberOfMessagesPerSend || 1,

Просмотреть файл

@ -80,13 +80,13 @@ export class SBStressTestsBase {
options?: CreateQueueOptions | undefined,
testOptions?: Record<string, string | number | boolean>
) {
this.queueName =
(!queueNamePrefix ? `queue` : queueNamePrefix) + `-${Math.ceil(Math.random() * 100000)}`;
this.reportFileName = `temp/report-${this.queueName}.txt`;
this.errorsFileName = `temp/errors-${this.queueName}.txt`;
this.messagesReportFileName = `temp/messages-${this.queueName}.json`;
if (testOptions) console.log(testOptions);
await appendFile(this.reportFileName, JSON.stringify(testOptions, null, 2));
this.queueName =
(!queueNamePrefix ? `queue` : queueNamePrefix) + `-${Math.ceil(Math.random() * 100000)}`;
await this.serviceBusAdministrationClient.createQueue(this.queueName, options);
}

Просмотреть файл

@ -67,7 +67,9 @@ export async function saveDiscrepanciesFromTrackedMessages(
const output = {
messages_sent_but_never_received: [],
messages_not_sent_but_received: [],
messages_sent_multiple_times: []
messages_sent_multiple_times: [],
messages_sent_once_but_received_multiple_times: [],
messages_sent_once_and_received_once: []
};
for (const id in trackedMessageIds) {
if (trackedMessageIds[id].sentCount <= 0) {
@ -82,6 +84,14 @@ export async function saveDiscrepanciesFromTrackedMessages(
// Message was sent multiple times
output.messages_sent_multiple_times.push(id);
}
if (trackedMessageIds[id].sentCount === 1 && trackedMessageIds[id].receivedCount > 1) {
// Message was sent once but received multiple times
output.messages_sent_once_but_received_multiple_times.push(id);
}
if (trackedMessageIds[id].sentCount === 1 && trackedMessageIds[id].receivedCount === 1) {
// Message was sent once and received once
output.messages_sent_once_and_received_once.push(id);
}
}
await writeFile(fileName, JSON.stringify(output));