From 2e56a4f08dce6c4ab5601957c4c9af9fb8709c3a Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Thu, 27 May 2021 14:38:36 -0700 Subject: [PATCH] [service-bus] Updating samples to fix a few issues: (#15430) Updating samples to fix a few issues: - Show the right way to restart your receiver when using sessions (which don't do automatic retries like non-sessions). - Some of the samples printed almost too little console output to indicate what they were doing or didn't demonstrate the right techniques (asking for multiple messages at a time, etc..). Those have been updated as well. Fixes #14531 --- .../service-bus/samples-dev/browseMessages.ts | 18 ++++--- .../samples-dev/receiveMessagesLoop.ts | 24 +++++++-- .../service-bus/samples-dev/sendMessages.ts | 21 +++++--- .../service-bus/samples-dev/session.ts | 52 ++++++++++++++----- .../samples/v7/javascript/browseMessages.js | 18 ++++--- .../v7/javascript/receiveMessagesLoop.js | 24 +++++++-- .../samples/v7/javascript/sendMessages.js | 17 +++--- .../samples/v7/javascript/session.js | 52 ++++++++++++++----- .../v7/typescript/src/browseMessages.ts | 18 ++++--- .../v7/typescript/src/receiveMessagesLoop.ts | 24 +++++++-- .../samples/v7/typescript/src/sendMessages.ts | 21 +++++--- .../samples/v7/typescript/src/session.ts | 52 ++++++++++++++----- 12 files changed, 244 insertions(+), 97 deletions(-) diff --git a/sdk/servicebus/service-bus/samples-dev/browseMessages.ts b/sdk/servicebus/service-bus/samples-dev/browseMessages.ts index 60d48f3a6d6..df05538d05d 100644 --- a/sdk/servicebus/service-bus/samples-dev/browseMessages.ts +++ b/sdk/servicebus/service-bus/samples-dev/browseMessages.ts @@ -29,14 +29,18 @@ export async function main() { const queueReceiver = sbClient.createReceiver(queueName); try { - for (let i = 0; i < 20; i++) { - const [message] = await queueReceiver.peekMessages(1); - if (!message) { - console.log("No more messages to peek"); - break; - } - console.log(`Peeking message #${i}: ${message.body}`); + // peeking messages does not lock or remove messages from a queue or subscription. + // For locking and/or removal, look at the `receiveMessagesLoop` or `receiveMessagesStreaming` samples, + // which cover using a receiver with a `receiveMode`. + console.log(`Attempting to peek 10 messages at a time`); + const peekedMessages = await queueReceiver.peekMessages(10); + + console.log(`Got ${peekedMessages.length} messages.`); + + for (let i = 0; i < peekedMessages.length; ++i) { + console.log(`Peeked message #${i}: ${peekedMessages[i].body}`); } + await queueReceiver.close(); } finally { await sbClient.close(); diff --git a/sdk/servicebus/service-bus/samples-dev/receiveMessagesLoop.ts b/sdk/servicebus/service-bus/samples-dev/receiveMessagesLoop.ts index 7ca72df4a49..4ec75076cf4 100644 --- a/sdk/servicebus/service-bus/samples-dev/receiveMessagesLoop.ts +++ b/sdk/servicebus/service-bus/samples-dev/receiveMessagesLoop.ts @@ -31,9 +31,15 @@ export async function main() { // To receive messages from sessions, use getSessionReceiver instead of getReceiver or look at // the sample in sessions.ts file try { - for (let i = 0; i < 10; i++) { - const messages = await queueReceiver.receiveMessages(1, { - maxWaitTimeInMs: 5000 + let allMessages = []; + + console.log(`Receiving 10 messages...`); + + while (allMessages.length < 10) { + // NOTE: asking for 10 messages does not guarantee that we will return + // all 10 at once so we must loop until we get all the messages we expected. + const messages = await queueReceiver.receiveMessages(10, { + maxWaitTimeInMs: 60 * 1000 }); if (!messages.length) { @@ -41,9 +47,17 @@ export async function main() { break; } - console.log(`Received message #${i}: ${messages[0].body}`); - await queueReceiver.completeMessage(messages[0]); + console.log(`Received ${messages.length} messages`); + allMessages.push(...messages); + + for (let message of messages) { + console.log(` Message: '${message.body}'`); + + // completing the message will remove it from the remote queue or subscription. + await queueReceiver.completeMessage(message); + } } + await queueReceiver.close(); } finally { await sbClient.close(); diff --git a/sdk/servicebus/service-bus/samples-dev/sendMessages.ts b/sdk/servicebus/service-bus/samples-dev/sendMessages.ts index 26b26cdc877..a276af94685 100644 --- a/sdk/servicebus/service-bus/samples-dev/sendMessages.ts +++ b/sdk/servicebus/service-bus/samples-dev/sendMessages.ts @@ -14,7 +14,7 @@ * @azsdk-weight 100 */ -import { ServiceBusClient, ServiceBusMessage } from "@azure/service-bus"; +import { ServiceBusClient, ServiceBusMessage, ServiceBusMessageBatch } from "@azure/service-bus"; // Load the .env file if it exists import * as dotenv from "dotenv"; @@ -24,12 +24,15 @@ dotenv.config(); const connectionString = process.env.SERVICEBUS_CONNECTION_STRING || ""; const queueName = process.env.QUEUE_NAME || ""; -const messages: ServiceBusMessage[] = [ +const firstSetOfMessages: ServiceBusMessage[] = [ { body: "Albert Einstein" }, { body: "Werner Heisenberg" }, { body: "Marie Curie" }, { body: "Steven Hawking" }, - { body: "Isaac Newton" }, + { body: "Isaac Newton" } +]; + +const secondSetOfMessages: ServiceBusMessage[] = [ { body: "Niels Bohr" }, { body: "Michael Faraday" }, { body: "Galileo Galilei" }, @@ -46,27 +49,29 @@ export async function main() { try { // Tries to send all messages in a single batch. // Will fail if the messages cannot fit in a batch. - await sender.sendMessages(messages); + console.log(`Sending the first 5 scientists (as an array)`); + await sender.sendMessages(firstSetOfMessages); // Sends all messages using one or more ServiceBusMessageBatch objects as required - let batch = await sender.createMessageBatch(); + let batch: ServiceBusMessageBatch = await sender.createMessageBatch(); - for (let i = 0; i < messages.length; i++) { - const message = messages[i]; + for (const message of secondSetOfMessages) { if (!batch.tryAddMessage(message)) { // Send the current batch as it is full and create a new one await sender.sendMessages(batch); batch = await sender.createMessageBatch(); - if (!batch.tryAddMessage(messages[i])) { + if (!batch.tryAddMessage(message)) { throw new Error("Message too big to fit in a batch"); } } } // Send the batch + console.log(`Sending the last 5 scientists (as a ServiceBusMessageBatch)`); await sender.sendMessages(batch); // Close the sender + console.log(`Done sending, closing...`); await sender.close(); } finally { await sbClient.close(); diff --git a/sdk/servicebus/service-bus/samples-dev/session.ts b/sdk/servicebus/service-bus/samples-dev/session.ts index 5e1ff675602..ef39572e5da 100644 --- a/sdk/servicebus/service-bus/samples-dev/session.ts +++ b/sdk/servicebus/service-bus/samples-dev/session.ts @@ -43,12 +43,14 @@ export async function main() { const sbClient = new ServiceBusClient(connectionString); try { + console.log(`Sending 5 messages to 'session-1'`); await sendMessage(sbClient, listOfScientists[0], "session-1"); await sendMessage(sbClient, listOfScientists[1], "session-1"); await sendMessage(sbClient, listOfScientists[2], "session-1"); await sendMessage(sbClient, listOfScientists[3], "session-1"); await sendMessage(sbClient, listOfScientists[4], "session-1"); + console.log(`Sending 5 messages to 'session-2'`); await sendMessage(sbClient, listOfScientists[5], "session-2"); await sendMessage(sbClient, listOfScientists[6], "session-2"); await sendMessage(sbClient, listOfScientists[7], "session-2"); @@ -80,22 +82,46 @@ async function sendMessage(sbClient: ServiceBusClient, scientist: any, sessionId async function receiveMessages(sbClient: ServiceBusClient, sessionId: string) { // If receiving from a subscription you can use the acceptSession(topic, subscription, sessionId) overload - const receiver = await sbClient.acceptSession(queueName, sessionId); + let endDate: number | undefined; - const processMessage = async (message: ServiceBusMessage) => { - console.log(`Received: ${message.sessionId} - ${message.body} `); - }; - const processError = async (args: ProcessErrorArgs) => { - console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error); - }; - receiver.subscribe({ - processMessage, - processError - }); + while (true) { + console.log(`Creating session receiver for session '${sessionId}'`); + const receiver = await sbClient.acceptSession(queueName, sessionId); - await delay(5000); + const subscribePromise = new Promise((_, reject) => { + const processMessage = async (message: ServiceBusMessage) => { + console.log(`Received: ${message.sessionId} - ${message.body} `); + }; + const processError = async (args: ProcessErrorArgs) => { + console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error); + reject(args.error); + }; - await receiver.close(); + receiver.subscribe({ + processMessage, + processError + }); + }); + + const now = Date.now(); + endDate = endDate ?? now + 20000; + let remainingTime: number = endDate - now; + + console.log(`Waiting for ${remainingTime} milliseconds for messages to arrive.`); + + try { + await Promise.race([subscribePromise, delay(remainingTime)]); + + // wait time has expired, we can stop listening. + console.log(`Time has expired, closing receiver for session '${sessionId}'`); + + await receiver.close(); + break; + } catch (err) { + // `err` was already logged part of `processError` above. + await receiver.close(); + } + } } main().catch((err) => { diff --git a/sdk/servicebus/service-bus/samples/v7/javascript/browseMessages.js b/sdk/servicebus/service-bus/samples/v7/javascript/browseMessages.js index eb504cb51ac..46dc6e00377 100644 --- a/sdk/servicebus/service-bus/samples/v7/javascript/browseMessages.js +++ b/sdk/servicebus/service-bus/samples/v7/javascript/browseMessages.js @@ -28,14 +28,18 @@ async function main() { const queueReceiver = sbClient.createReceiver(queueName); try { - for (let i = 0; i < 20; i++) { - const [message] = await queueReceiver.peekMessages(1); - if (!message) { - console.log("No more messages to peek"); - break; - } - console.log(`Peeking message #${i}: ${message.body}`); + // peeking messages does not lock or remove messages from a queue or subscription. + // For locking and/or removal, look at the `receiveMessagesLoop` or `receiveMessagesStreaming` samples, + // which cover using a receiver with a `receiveMode`. + console.log(`Attempting to peek 10 messages at a time`); + const peekedMessages = await queueReceiver.peekMessages(10); + + console.log(`Got ${peekedMessages.length} messages.`); + + for (let i = 0; i < peekedMessages.length; ++i) { + console.log(`Peeked message #${i}: ${peekedMessages[i].body}`); } + await queueReceiver.close(); } finally { await sbClient.close(); diff --git a/sdk/servicebus/service-bus/samples/v7/javascript/receiveMessagesLoop.js b/sdk/servicebus/service-bus/samples/v7/javascript/receiveMessagesLoop.js index 8fbc15b6536..aa757b1f802 100644 --- a/sdk/servicebus/service-bus/samples/v7/javascript/receiveMessagesLoop.js +++ b/sdk/servicebus/service-bus/samples/v7/javascript/receiveMessagesLoop.js @@ -30,9 +30,15 @@ async function main() { // To receive messages from sessions, use getSessionReceiver instead of getReceiver or look at // the sample in sessions.ts file try { - for (let i = 0; i < 10; i++) { - const messages = await queueReceiver.receiveMessages(1, { - maxWaitTimeInMs: 5000 + let allMessages = []; + + console.log(`Receiving 10 messages...`); + + while (allMessages.length < 10) { + // NOTE: asking for 10 messages does not guarantee that we will return + // all 10 at once so we must loop until we get all the messages we expected. + const messages = await queueReceiver.receiveMessages(10, { + maxWaitTimeInMs: 60 * 1000 }); if (!messages.length) { @@ -40,9 +46,17 @@ async function main() { break; } - console.log(`Received message #${i}: ${messages[0].body}`); - await queueReceiver.completeMessage(messages[0]); + console.log(`Received ${messages.length} messages`); + allMessages.push(...messages); + + for (let message of messages) { + console.log(` Message: '${message.body}'`); + + // completing the message will remove it from the remote queue or subscription. + await queueReceiver.completeMessage(message); + } } + await queueReceiver.close(); } finally { await sbClient.close(); diff --git a/sdk/servicebus/service-bus/samples/v7/javascript/sendMessages.js b/sdk/servicebus/service-bus/samples/v7/javascript/sendMessages.js index 7cd56eae821..20eac64fd36 100644 --- a/sdk/servicebus/service-bus/samples/v7/javascript/sendMessages.js +++ b/sdk/servicebus/service-bus/samples/v7/javascript/sendMessages.js @@ -23,12 +23,15 @@ dotenv.config(); const connectionString = process.env.SERVICEBUS_CONNECTION_STRING || ""; const queueName = process.env.QUEUE_NAME || ""; -const messages = [ +const firstSetOfMessages = [ { body: "Albert Einstein" }, { body: "Werner Heisenberg" }, { body: "Marie Curie" }, { body: "Steven Hawking" }, - { body: "Isaac Newton" }, + { body: "Isaac Newton" } +]; + +const secondSetOfMessages = [ { body: "Niels Bohr" }, { body: "Michael Faraday" }, { body: "Galileo Galilei" }, @@ -45,27 +48,29 @@ async function main() { try { // Tries to send all messages in a single batch. // Will fail if the messages cannot fit in a batch. - await sender.sendMessages(messages); + console.log(`Sending the first 5 scientists (as an array)`); + await sender.sendMessages(firstSetOfMessages); // Sends all messages using one or more ServiceBusMessageBatch objects as required let batch = await sender.createMessageBatch(); - for (let i = 0; i < messages.length; i++) { - const message = messages[i]; + for (const message of secondSetOfMessages) { if (!batch.tryAddMessage(message)) { // Send the current batch as it is full and create a new one await sender.sendMessages(batch); batch = await sender.createMessageBatch(); - if (!batch.tryAddMessage(messages[i])) { + if (!batch.tryAddMessage(message)) { throw new Error("Message too big to fit in a batch"); } } } // Send the batch + console.log(`Sending the last 5 scientists (as a ServiceBusMessageBatch)`); await sender.sendMessages(batch); // Close the sender + console.log(`Done sending, closing...`); await sender.close(); } finally { await sbClient.close(); diff --git a/sdk/servicebus/service-bus/samples/v7/javascript/session.js b/sdk/servicebus/service-bus/samples/v7/javascript/session.js index 3dfd6da0e85..30df96e94c0 100644 --- a/sdk/servicebus/service-bus/samples/v7/javascript/session.js +++ b/sdk/servicebus/service-bus/samples/v7/javascript/session.js @@ -42,12 +42,14 @@ async function main() { const sbClient = new ServiceBusClient(connectionString); try { + console.log(`Sending 5 messages to 'session-1'`); await sendMessage(sbClient, listOfScientists[0], "session-1"); await sendMessage(sbClient, listOfScientists[1], "session-1"); await sendMessage(sbClient, listOfScientists[2], "session-1"); await sendMessage(sbClient, listOfScientists[3], "session-1"); await sendMessage(sbClient, listOfScientists[4], "session-1"); + console.log(`Sending 5 messages to 'session-2'`); await sendMessage(sbClient, listOfScientists[5], "session-2"); await sendMessage(sbClient, listOfScientists[6], "session-2"); await sendMessage(sbClient, listOfScientists[7], "session-2"); @@ -79,22 +81,46 @@ async function sendMessage(sbClient, scientist, sessionId) { async function receiveMessages(sbClient, sessionId) { // If receiving from a subscription you can use the acceptSession(topic, subscription, sessionId) overload - const receiver = await sbClient.acceptSession(queueName, sessionId); + let endDate; - const processMessage = async (message) => { - console.log(`Received: ${message.sessionId} - ${message.body} `); - }; - const processError = async (args) => { - console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error); - }; - receiver.subscribe({ - processMessage, - processError - }); + while (true) { + console.log(`Creating session receiver for session '${sessionId}'`); + const receiver = await sbClient.acceptSession(queueName, sessionId); - await delay(5000); + const subscribePromise = new Promise((_, reject) => { + const processMessage = async (message) => { + console.log(`Received: ${message.sessionId} - ${message.body} `); + }; + const processError = async (args) => { + console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error); + reject(args.error); + }; - await receiver.close(); + receiver.subscribe({ + processMessage, + processError + }); + }); + + const now = Date.now(); + endDate = endDate ?? now + 20000; + let remainingTime = endDate - now; + + console.log(`Waiting for ${remainingTime} milliseconds for messages to arrive.`); + + try { + await Promise.race([subscribePromise, delay(remainingTime)]); + + // wait time has expired, we can stop listening. + console.log(`Time has expired, closing receiver for session '${sessionId}'`); + + await receiver.close(); + break; + } catch (err) { + // `err` was already logged part of `processError` above. + await receiver.close(); + } + } } main().catch((err) => { diff --git a/sdk/servicebus/service-bus/samples/v7/typescript/src/browseMessages.ts b/sdk/servicebus/service-bus/samples/v7/typescript/src/browseMessages.ts index b7f66ebcc95..4aa83efe380 100644 --- a/sdk/servicebus/service-bus/samples/v7/typescript/src/browseMessages.ts +++ b/sdk/servicebus/service-bus/samples/v7/typescript/src/browseMessages.ts @@ -28,14 +28,18 @@ export async function main() { const queueReceiver = sbClient.createReceiver(queueName); try { - for (let i = 0; i < 20; i++) { - const [message] = await queueReceiver.peekMessages(1); - if (!message) { - console.log("No more messages to peek"); - break; - } - console.log(`Peeking message #${i}: ${message.body}`); + // peeking messages does not lock or remove messages from a queue or subscription. + // For locking and/or removal, look at the `receiveMessagesLoop` or `receiveMessagesStreaming` samples, + // which cover using a receiver with a `receiveMode`. + console.log(`Attempting to peek 10 messages at a time`); + const peekedMessages = await queueReceiver.peekMessages(10); + + console.log(`Got ${peekedMessages.length} messages.`); + + for (let i = 0; i < peekedMessages.length; ++i) { + console.log(`Peeked message #${i}: ${peekedMessages[i].body}`); } + await queueReceiver.close(); } finally { await sbClient.close(); diff --git a/sdk/servicebus/service-bus/samples/v7/typescript/src/receiveMessagesLoop.ts b/sdk/servicebus/service-bus/samples/v7/typescript/src/receiveMessagesLoop.ts index ab7e392272a..1089c35a896 100644 --- a/sdk/servicebus/service-bus/samples/v7/typescript/src/receiveMessagesLoop.ts +++ b/sdk/servicebus/service-bus/samples/v7/typescript/src/receiveMessagesLoop.ts @@ -30,9 +30,15 @@ export async function main() { // To receive messages from sessions, use getSessionReceiver instead of getReceiver or look at // the sample in sessions.ts file try { - for (let i = 0; i < 10; i++) { - const messages = await queueReceiver.receiveMessages(1, { - maxWaitTimeInMs: 5000 + let allMessages = []; + + console.log(`Receiving 10 messages...`); + + while (allMessages.length < 10) { + // NOTE: asking for 10 messages does not guarantee that we will return + // all 10 at once so we must loop until we get all the messages we expected. + const messages = await queueReceiver.receiveMessages(10, { + maxWaitTimeInMs: 60 * 1000 }); if (!messages.length) { @@ -40,9 +46,17 @@ export async function main() { break; } - console.log(`Received message #${i}: ${messages[0].body}`); - await queueReceiver.completeMessage(messages[0]); + console.log(`Received ${messages.length} messages`); + allMessages.push(...messages); + + for (let message of messages) { + console.log(` Message: '${message.body}'`); + + // completing the message will remove it from the remote queue or subscription. + await queueReceiver.completeMessage(message); + } } + await queueReceiver.close(); } finally { await sbClient.close(); diff --git a/sdk/servicebus/service-bus/samples/v7/typescript/src/sendMessages.ts b/sdk/servicebus/service-bus/samples/v7/typescript/src/sendMessages.ts index 99a0b5dfb9f..f6b7e4b4443 100644 --- a/sdk/servicebus/service-bus/samples/v7/typescript/src/sendMessages.ts +++ b/sdk/servicebus/service-bus/samples/v7/typescript/src/sendMessages.ts @@ -13,7 +13,7 @@ * @summary Demonstrates how to send messages to Service Bus Queue/Topic */ -import { ServiceBusClient, ServiceBusMessage } from "@azure/service-bus"; +import { ServiceBusClient, ServiceBusMessage, ServiceBusMessageBatch } from "@azure/service-bus"; // Load the .env file if it exists import * as dotenv from "dotenv"; @@ -23,12 +23,15 @@ dotenv.config(); const connectionString = process.env.SERVICEBUS_CONNECTION_STRING || ""; const queueName = process.env.QUEUE_NAME || ""; -const messages: ServiceBusMessage[] = [ +const firstSetOfMessages: ServiceBusMessage[] = [ { body: "Albert Einstein" }, { body: "Werner Heisenberg" }, { body: "Marie Curie" }, { body: "Steven Hawking" }, - { body: "Isaac Newton" }, + { body: "Isaac Newton" } +]; + +const secondSetOfMessages: ServiceBusMessage[] = [ { body: "Niels Bohr" }, { body: "Michael Faraday" }, { body: "Galileo Galilei" }, @@ -45,27 +48,29 @@ export async function main() { try { // Tries to send all messages in a single batch. // Will fail if the messages cannot fit in a batch. - await sender.sendMessages(messages); + console.log(`Sending the first 5 scientists (as an array)`); + await sender.sendMessages(firstSetOfMessages); // Sends all messages using one or more ServiceBusMessageBatch objects as required - let batch = await sender.createMessageBatch(); + let batch: ServiceBusMessageBatch = await sender.createMessageBatch(); - for (let i = 0; i < messages.length; i++) { - const message = messages[i]; + for (const message of secondSetOfMessages) { if (!batch.tryAddMessage(message)) { // Send the current batch as it is full and create a new one await sender.sendMessages(batch); batch = await sender.createMessageBatch(); - if (!batch.tryAddMessage(messages[i])) { + if (!batch.tryAddMessage(message)) { throw new Error("Message too big to fit in a batch"); } } } // Send the batch + console.log(`Sending the last 5 scientists (as a ServiceBusMessageBatch)`); await sender.sendMessages(batch); // Close the sender + console.log(`Done sending, closing...`); await sender.close(); } finally { await sbClient.close(); diff --git a/sdk/servicebus/service-bus/samples/v7/typescript/src/session.ts b/sdk/servicebus/service-bus/samples/v7/typescript/src/session.ts index 962ca5dd551..862bcea6cc2 100644 --- a/sdk/servicebus/service-bus/samples/v7/typescript/src/session.ts +++ b/sdk/servicebus/service-bus/samples/v7/typescript/src/session.ts @@ -42,12 +42,14 @@ export async function main() { const sbClient = new ServiceBusClient(connectionString); try { + console.log(`Sending 5 messages to 'session-1'`); await sendMessage(sbClient, listOfScientists[0], "session-1"); await sendMessage(sbClient, listOfScientists[1], "session-1"); await sendMessage(sbClient, listOfScientists[2], "session-1"); await sendMessage(sbClient, listOfScientists[3], "session-1"); await sendMessage(sbClient, listOfScientists[4], "session-1"); + console.log(`Sending 5 messages to 'session-2'`); await sendMessage(sbClient, listOfScientists[5], "session-2"); await sendMessage(sbClient, listOfScientists[6], "session-2"); await sendMessage(sbClient, listOfScientists[7], "session-2"); @@ -79,22 +81,46 @@ async function sendMessage(sbClient: ServiceBusClient, scientist: any, sessionId async function receiveMessages(sbClient: ServiceBusClient, sessionId: string) { // If receiving from a subscription you can use the acceptSession(topic, subscription, sessionId) overload - const receiver = await sbClient.acceptSession(queueName, sessionId); + let endDate: number | undefined; - const processMessage = async (message: ServiceBusMessage) => { - console.log(`Received: ${message.sessionId} - ${message.body} `); - }; - const processError = async (args: ProcessErrorArgs) => { - console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error); - }; - receiver.subscribe({ - processMessage, - processError - }); + while (true) { + console.log(`Creating session receiver for session '${sessionId}'`); + const receiver = await sbClient.acceptSession(queueName, sessionId); - await delay(5000); + const subscribePromise = new Promise((_, reject) => { + const processMessage = async (message: ServiceBusMessage) => { + console.log(`Received: ${message.sessionId} - ${message.body} `); + }; + const processError = async (args: ProcessErrorArgs) => { + console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error); + reject(args.error); + }; - await receiver.close(); + receiver.subscribe({ + processMessage, + processError + }); + }); + + const now = Date.now(); + endDate = endDate ?? now + 20000; + let remainingTime: number = endDate - now; + + console.log(`Waiting for ${remainingTime} milliseconds for messages to arrive.`); + + try { + await Promise.race([subscribePromise, delay(remainingTime)]); + + // wait time has expired, we can stop listening. + console.log(`Time has expired, closing receiver for session '${sessionId}'`); + + await receiver.close(); + break; + } catch (err) { + // `err` was already logged part of `processError` above. + await receiver.close(); + } + } } main().catch((err) => {