Adding back the get started samples
This commit is contained in:
Родитель
e4c1ca392a
Коммит
0cd631904e
|
@ -0,0 +1,12 @@
|
||||||
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
|
||||||
|
<PropertyGroup>
|
||||||
|
<OutputType>Exe</OutputType>
|
||||||
|
<TargetFramework>netcoreapp2.0</TargetFramework>
|
||||||
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="2.0.0" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
</Project>
|
|
@ -0,0 +1,112 @@
|
||||||
|
// Copyright (c) Microsoft. All rights reserved.
|
||||||
|
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||||
|
|
||||||
|
namespace BasicSendReceiveUsingQueueClient
|
||||||
|
{
|
||||||
|
using Microsoft.Azure.ServiceBus;
|
||||||
|
using System;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
class Program
|
||||||
|
{
|
||||||
|
// Connection String for the namespace can be obtained from the Azure portal under the
|
||||||
|
// 'Shared Access policies' section.
|
||||||
|
const string ServiceBusConnectionString = "{ServiceBus connection string}";
|
||||||
|
const string QueueName = "{Queue Name}";
|
||||||
|
static IQueueClient queueClient;
|
||||||
|
|
||||||
|
static void Main(string[] args)
|
||||||
|
{
|
||||||
|
MainAsync().GetAwaiter().GetResult();
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task MainAsync()
|
||||||
|
{
|
||||||
|
const int numberOfMessages = 10;
|
||||||
|
queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
|
||||||
|
|
||||||
|
Console.WriteLine("======================================================");
|
||||||
|
Console.WriteLine("Press any key to exit after receiving all the messages.");
|
||||||
|
Console.WriteLine("======================================================");
|
||||||
|
|
||||||
|
// Register QueueClient's MessageHandler and receive messages in a loop
|
||||||
|
RegisterOnMessageHandlerAndReceiveMessages();
|
||||||
|
|
||||||
|
// Send Messages
|
||||||
|
await SendMessagesAsync(numberOfMessages);
|
||||||
|
|
||||||
|
Console.ReadKey();
|
||||||
|
|
||||||
|
await queueClient.CloseAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void RegisterOnMessageHandlerAndReceiveMessages()
|
||||||
|
{
|
||||||
|
// Configure the MessageHandler Options in terms of exception handling, number of concurrent messages to deliver etc.
|
||||||
|
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
|
||||||
|
{
|
||||||
|
// Maximum number of Concurrent calls to the callback `ProcessMessagesAsync`, set to 1 for simplicity.
|
||||||
|
// Set it according to how many messages the application wants to process in parallel.
|
||||||
|
MaxConcurrentCalls = 1,
|
||||||
|
|
||||||
|
// Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
|
||||||
|
// False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
|
||||||
|
AutoComplete = false
|
||||||
|
};
|
||||||
|
|
||||||
|
// Register the function that will process messages
|
||||||
|
queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task ProcessMessagesAsync(Message message, CancellationToken token)
|
||||||
|
{
|
||||||
|
// Process the message
|
||||||
|
Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
|
||||||
|
|
||||||
|
// Complete the message so that it is not received again.
|
||||||
|
// This can be done only if the queueClient is created in ReceiveMode.PeekLock mode (which is default).
|
||||||
|
await queueClient.CompleteAsync(message.SystemProperties.LockToken);
|
||||||
|
|
||||||
|
// Note: Use the cancellationToken passed as necessary to determine if the queueClient has already been closed.
|
||||||
|
// If queueClient has already been Closed, you may chose to not call CompleteAsync() or AbandonAsync() etc. calls
|
||||||
|
// to avoid unnecessary exceptions.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use this Handler to look at the exceptions received on the MessagePump
|
||||||
|
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
|
||||||
|
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
|
||||||
|
Console.WriteLine("Exception context for troubleshooting:");
|
||||||
|
Console.WriteLine($"- Endpoint: {context.Endpoint}");
|
||||||
|
Console.WriteLine($"- Entity Path: {context.EntityPath}");
|
||||||
|
Console.WriteLine($"- Executing Action: {context.Action}");
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task SendMessagesAsync(int numberOfMessagesToSend)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
for (var i = 0; i < numberOfMessagesToSend; i++)
|
||||||
|
{
|
||||||
|
// Create a new message to send to the queue
|
||||||
|
string messageBody = $"Message {i}";
|
||||||
|
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
|
||||||
|
|
||||||
|
// Write the body of the message to the console
|
||||||
|
Console.WriteLine($"Sending message: {messageBody}");
|
||||||
|
|
||||||
|
// Send the message to the queue
|
||||||
|
await queueClient.SendAsync(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,171 @@
|
||||||
|
# Get started sending and receiving messages from ServiceBus queues using QueueClient
|
||||||
|
|
||||||
|
In order to run the sample in this directory, replace the following bracketed values in the `Program.cs` file.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
// Connection String for the namespace can be obtained from the Azure portal under the
|
||||||
|
// `Shared Access policies` section.
|
||||||
|
const string ServiceBusConnectionString = "{ServiceBus connection string}";
|
||||||
|
const string QueueName = "{Queue Name}";
|
||||||
|
```
|
||||||
|
|
||||||
|
Once you replace the above values run the following from a command prompt:
|
||||||
|
|
||||||
|
```
|
||||||
|
dotnet restore
|
||||||
|
dotnet build
|
||||||
|
dotnet run
|
||||||
|
```
|
||||||
|
|
||||||
|
## The Sample Program
|
||||||
|
To keep things reasonably simple, the sample program keeps send and receive code within a single hosting application.
|
||||||
|
Typically in real world applications these roles are often spread across applications, services, or at least across
|
||||||
|
independently deployed and run tiers of applications or services. For clarity, the send and receive activities are kept as
|
||||||
|
separate methods as if they were different apps.
|
||||||
|
|
||||||
|
For further information on how to create this sample on your own, follow the rest of the tutorial.
|
||||||
|
|
||||||
|
## What will be accomplished
|
||||||
|
In this tutorial, we will write a console application to send and receive messages to a ServiceBus queue using a QueueClient.
|
||||||
|
QueueClient offers a simple API surface to send message(or messages in a batch) and offers a simple MessagePump model to receive messages.
|
||||||
|
Once a message process handler is registered as shown below, the User code does not have to write explicit code to receive messages and
|
||||||
|
if configured using `MessageHandlerOptions`, does not have to write explicit code to renew message locks or complete messages or improve
|
||||||
|
the degree of concurrency of message processing. Hence the queueClient can be used in scenarios where the User wants to get started
|
||||||
|
quickly or the scenarios where they need basic send/receive and wants to achieve that with as little code writing as possible.
|
||||||
|
|
||||||
|
## Prerequisites
|
||||||
|
1. [.NET Core](https://www.microsoft.com/net/core)
|
||||||
|
2. An Azure subscription.
|
||||||
|
3. [A ServiceBus namespace](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal)
|
||||||
|
4. [A ServiceBus queue](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues#2-create-a-queue-using-the-azure-portal)
|
||||||
|
|
||||||
|
### Create a console application
|
||||||
|
|
||||||
|
- Create a new .NET Core application. Check out [this link](https://docs.microsoft.com/en-us/dotnet/articles/core/getting-started) with help to create a new application on your operating system.
|
||||||
|
|
||||||
|
### Add the ServiceBus client reference
|
||||||
|
|
||||||
|
1. Add the following to your project.json, making sure that the solution references the `Microsoft.Azure.ServiceBus` project.
|
||||||
|
|
||||||
|
```json
|
||||||
|
"Microsoft.Azure.ServiceBus": "1.0.0"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Write some code to send and receive messages from the queue
|
||||||
|
1. Add the following using statement to the top of the Program.cs file.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
using Microsoft.Azure.ServiceBus;
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Add the following variables to the `Program` class, and replace the placeholder values:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
const string ServiceBusConnectionString = "{Service Bus connection string}";
|
||||||
|
const string QueueName = "{Queue Name}";
|
||||||
|
static IQueueClient queueClient;
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new Task called `ProcessMessagesAsync` that knows how to handle received messages with the following code:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static async Task ProcessMessagesAsync(Message message, CancellationToken token)
|
||||||
|
{
|
||||||
|
// Process the message
|
||||||
|
Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
|
||||||
|
|
||||||
|
// Complete the message so that it is not received again.
|
||||||
|
// This can be done only if the queueClient is opened in ReceiveMode.PeekLock mode (which is default).
|
||||||
|
await queueClient.CompleteAsync(message.SystemProperties.LockToken);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new Task called `ExceptionReceivedHandler` to look at the exceptions received on the MessagePump. This will be useful for debugging purposes.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
// Use this Handler to look at the exceptions received on the MessagePump
|
||||||
|
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new method called 'RegisterOnMessageHandlerAndReceiveMessages' to register the `ProcessMessagesAsync` and the
|
||||||
|
`ExceptionReceivedHandler` with the necessary `MessageHandlerOptions` parameters to start receiving messages
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static void RegisterOnMessageHandlerAndReceiveMessages()
|
||||||
|
{
|
||||||
|
// Configure the MessageHandler Options in terms of exception handling, number of concurrent messages to deliver etc.
|
||||||
|
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
|
||||||
|
{
|
||||||
|
// Maximum number of Concurrent calls to the callback `ProcessMessagesAsync`, set to 1 for simplicity.
|
||||||
|
// Set it according to how many messages the application wants to process in parallel.
|
||||||
|
MaxConcurrentCalls = 1,
|
||||||
|
|
||||||
|
// Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
|
||||||
|
// False value below indicates the Complete will be handled by the User Callback as seen in `ProcessMessagesAsync`.
|
||||||
|
AutoComplete = false
|
||||||
|
};
|
||||||
|
|
||||||
|
// Register the function that will process messages
|
||||||
|
queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new method called `SendMessagesAsync` with the following code:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
// Sends messages to the queue.
|
||||||
|
static async Task SendMessagesAsync(int numberOfMessagesToSend)
|
||||||
|
{
|
||||||
|
for (var i = 0; i < numberOfMessagesToSend; i++)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Create a new message to send to the queue
|
||||||
|
string messageBody = $"Message {i}";
|
||||||
|
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
|
||||||
|
|
||||||
|
// Write the body of the message to the console
|
||||||
|
Console.WriteLine($"Sending message: {messageBody}");
|
||||||
|
|
||||||
|
// Send the message to the queue
|
||||||
|
await queueClient.SendAsync(message);
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new method called `MainAsync` with the following code:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static async Task MainAsync(string[] args)
|
||||||
|
{
|
||||||
|
queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
|
||||||
|
|
||||||
|
// Register QueueClient's MessageHandler and receive messages in a loop
|
||||||
|
RegisterOnMessageHandlerAndReceiveMessages();
|
||||||
|
|
||||||
|
// Send Messages
|
||||||
|
await SendMessagesToQueue(10);
|
||||||
|
|
||||||
|
Console.WriteLine("Press any key to exit after receiving all the messages.");
|
||||||
|
Console.ReadKey();
|
||||||
|
|
||||||
|
await queueClient.CloseAsync();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Add the following code to the `Main` method:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
MainAsync(args).GetAwaiter().GetResult();
|
||||||
|
```
|
||||||
|
|
||||||
|
Congratulations! You have now sent and received messages to a ServiceBus queue, using QueueClient.
|
|
@ -0,0 +1,12 @@
|
||||||
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
|
||||||
|
<PropertyGroup>
|
||||||
|
<OutputType>Exe</OutputType>
|
||||||
|
<TargetFramework>netcoreapp2.0</TargetFramework>
|
||||||
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="2.0.0" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
</Project>
|
|
@ -0,0 +1,116 @@
|
||||||
|
// Copyright (c) Microsoft. All rights reserved.
|
||||||
|
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||||
|
|
||||||
|
namespace BasicSendReceiveUsingTopicSubscriptionClient
|
||||||
|
{
|
||||||
|
using Microsoft.Azure.ServiceBus;
|
||||||
|
using System;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
class Program
|
||||||
|
{
|
||||||
|
// Connection String for the namespace can be obtained from the Azure portal under the
|
||||||
|
// 'Shared Access policies' section.
|
||||||
|
const string ServiceBusConnectionString = "{ServiceBus connection string}";
|
||||||
|
const string TopicName = "{Topic Name}";
|
||||||
|
const string SubscriptionName = "{Subscription Name}";
|
||||||
|
static ITopicClient topicClient;
|
||||||
|
static ISubscriptionClient subscriptionClient;
|
||||||
|
|
||||||
|
static void Main(string[] args)
|
||||||
|
{
|
||||||
|
MainAsync().GetAwaiter().GetResult();
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task MainAsync()
|
||||||
|
{
|
||||||
|
const int numberOfMessages = 10;
|
||||||
|
topicClient = new TopicClient(ServiceBusConnectionString, TopicName);
|
||||||
|
subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);
|
||||||
|
|
||||||
|
Console.WriteLine("======================================================");
|
||||||
|
Console.WriteLine("Press any key to exit after receiving all the messages.");
|
||||||
|
Console.WriteLine("======================================================");
|
||||||
|
|
||||||
|
// Register Subscription's MessageHandler and receive messages in a loop
|
||||||
|
RegisterOnMessageHandlerAndReceiveMessages();
|
||||||
|
|
||||||
|
// Send Messages
|
||||||
|
await SendMessagesAsync(numberOfMessages);
|
||||||
|
|
||||||
|
Console.ReadKey();
|
||||||
|
|
||||||
|
await subscriptionClient.CloseAsync();
|
||||||
|
await topicClient.CloseAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void RegisterOnMessageHandlerAndReceiveMessages()
|
||||||
|
{
|
||||||
|
// Configure the MessageHandler Options in terms of exception handling, number of concurrent messages to deliver etc.
|
||||||
|
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
|
||||||
|
{
|
||||||
|
// Maximum number of Concurrent calls to the callback `ProcessMessagesAsync`, set to 1 for simplicity.
|
||||||
|
// Set it according to how many messages the application wants to process in parallel.
|
||||||
|
MaxConcurrentCalls = 1,
|
||||||
|
|
||||||
|
// Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
|
||||||
|
// False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
|
||||||
|
AutoComplete = false
|
||||||
|
};
|
||||||
|
|
||||||
|
// Register the function that will process messages
|
||||||
|
subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task ProcessMessagesAsync(Message message, CancellationToken token)
|
||||||
|
{
|
||||||
|
// Process the message
|
||||||
|
Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
|
||||||
|
|
||||||
|
// Complete the message so that it is not received again.
|
||||||
|
// This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is default).
|
||||||
|
await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
|
||||||
|
|
||||||
|
// Note: Use the cancellationToken passed as necessary to determine if the subscriptionClient has already been closed.
|
||||||
|
// If subscriptionClient has already been Closed, you may chose to not call CompleteAsync() or AbandonAsync() etc. calls
|
||||||
|
// to avoid unnecessary exceptions.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use this Handler to look at the exceptions received on the MessagePump
|
||||||
|
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
|
||||||
|
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
|
||||||
|
Console.WriteLine("Exception context for troubleshooting:");
|
||||||
|
Console.WriteLine($"- Endpoint: {context.Endpoint}");
|
||||||
|
Console.WriteLine($"- Entity Path: {context.EntityPath}");
|
||||||
|
Console.WriteLine($"- Executing Action: {context.Action}");
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task SendMessagesAsync(int numberOfMessagesToSend)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
for (var i = 0; i < numberOfMessagesToSend; i++)
|
||||||
|
{
|
||||||
|
// Create a new message to send to the topic
|
||||||
|
string messageBody = $"Message {i}";
|
||||||
|
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
|
||||||
|
|
||||||
|
// Write the body of the message to the console
|
||||||
|
Console.WriteLine($"Sending message: {messageBody}");
|
||||||
|
|
||||||
|
// Send the message to the topic
|
||||||
|
await topicClient.SendAsync(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,185 @@
|
||||||
|
# Get started sending messages to a topic using topicClient and receiving those messages from Subscription using SubscriptionClient
|
||||||
|
|
||||||
|
In order to run the sample in this directory, replace the following bracketed values in the `Program.cs` file.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
// Connection String for the namespace can be obtained from the Azure portal under the
|
||||||
|
// `Shared Access policies` section.
|
||||||
|
const string ServiceBusConnectionString = "{Service Bus connection string}";
|
||||||
|
const string TopicName = "{Topic Name}";
|
||||||
|
const string SubscriptionName = "{Subscription Name}";
|
||||||
|
```
|
||||||
|
|
||||||
|
Once you replace the above values run the following from a command prompt:
|
||||||
|
|
||||||
|
```
|
||||||
|
dotnet restore
|
||||||
|
dotnet build
|
||||||
|
dotnet run
|
||||||
|
```
|
||||||
|
|
||||||
|
## The Sample Program
|
||||||
|
To keep things reasonably simple, the sample program keeps send and receive code within a single hosting application.
|
||||||
|
Typically in real world applications these roles are often spread across applications, services, or at least across
|
||||||
|
independently deployed and run tiers of applications or services. For clarity, the send and receive activities are kept as
|
||||||
|
separate methods as if they were different apps.
|
||||||
|
|
||||||
|
For further information on how to create this sample on your own, follow the rest of the tutorial.
|
||||||
|
|
||||||
|
## What will be accomplished
|
||||||
|
Topics are similar to Queues for the send side of the application. However unlike Queues, Topic can have zero or more subscriptions,
|
||||||
|
from which messages can be retrieved and each of subscription act like independent queues. Whether a message is selected into the
|
||||||
|
subscription is determined by the Filter condition for the subscription.
|
||||||
|
|
||||||
|
In this tutorial, we will write a console application to send messages to the topic using topicClient and receive those messages using
|
||||||
|
SubscriptionClient. TopicClient offers a simple API surface to send message(or messages in a batch). SubscriptionClient offers a simple
|
||||||
|
MessagePump model to receive messages. TopicClient cannot be used to receive messages and SubscriptionClient cannot be used to send messages.
|
||||||
|
Once a message process handler is registered as shown below, the User code does not have to write explicit code to receive messages and if
|
||||||
|
configured using `MessageHandlerOptions`, does not have to write explicit code to renew message locks or complete messages or improve the
|
||||||
|
degree of concurrency of message processing. Hence the Subscriptionclient can be used in scenarios where the User wants to get started
|
||||||
|
quickly or the scenarios where they need basic send/receive and wants to achieve that with as little code writing as possible.
|
||||||
|
|
||||||
|
## Prerequisites
|
||||||
|
1. [.NET Core](https://www.microsoft.com/net/core)
|
||||||
|
2. An Azure subscription.
|
||||||
|
3. [A ServiceBus namespace](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal)
|
||||||
|
4. [A ServiceBus Topic](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions#2-create-a-topic-using-the-azure-portal)
|
||||||
|
5. [A ServiceBus Subscription](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions)
|
||||||
|
|
||||||
|
### Create a console application
|
||||||
|
|
||||||
|
- Create a new .NET Core application. Check out [this link](https://docs.microsoft.com/en-us/dotnet/articles/core/getting-started) with help to create a new application on your operating system.
|
||||||
|
|
||||||
|
### Add the ServiceBus client reference
|
||||||
|
|
||||||
|
1. Add the following to your project.json, making sure that the solution references the `Microsoft.Azure.ServiceBus` project.
|
||||||
|
|
||||||
|
```json
|
||||||
|
"Microsoft.Azure.ServiceBus": "1.0.0"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Write some code to send messages to the topic and receive messages from the subscription
|
||||||
|
1. Add the following using statement to the top of the Program.cs file.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
using Microsoft.Azure.ServiceBus;
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Add the following variables to the `Program` class, and replace the placeholder values:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
const string ServiceBusConnectionString = "{Service Bus connection string}";
|
||||||
|
const string TopicName = "{Topic Name}";
|
||||||
|
const string SubscriptionName = "{Subscription Name}";
|
||||||
|
static ITopicClient topicClient;
|
||||||
|
static ISubscriptionClient subscriptionClient;
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new Task called `ProcessMessagesAsync` that knows how to handle received messages with the following code:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static async Task ProcessMessagesAsync(Message message, CancellationToken token)
|
||||||
|
{
|
||||||
|
// Process the message
|
||||||
|
Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
|
||||||
|
|
||||||
|
// Complete the message so that it is not received again.
|
||||||
|
// This can be done only if the subscriptionClient is opened in ReceiveMode.PeekLock mode (which is default).
|
||||||
|
await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new Task called `ExceptionReceivedHandler` to look at the exceptions received on the MessagePump. This will be useful for debugging purposes.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
// Use this Handler to look at the exceptions received on the MessagePump
|
||||||
|
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new method called 'RegisterOnMessageHandlerAndReceiveMessages' to register the `ProcessMessagesAsync` and the
|
||||||
|
`ExceptionReceivedHandler` with the necessary `MessageHandlerOptions` parameters to start receiving messages
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static void RegisterOnMessageHandlerAndReceiveMessages()
|
||||||
|
{
|
||||||
|
// Configure the MessageHandler Options in terms of exception handling, number of concurrent messages to deliver etc.
|
||||||
|
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
|
||||||
|
{
|
||||||
|
// Maximum number of Concurrent calls to the callback `ProcessMessagesAsync`, set to 1 for simplicity.
|
||||||
|
// Set it according to how many messages the application wants to process in parallel.
|
||||||
|
MaxConcurrentCalls = 1,
|
||||||
|
|
||||||
|
// Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
|
||||||
|
// False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
|
||||||
|
AutoComplete = false
|
||||||
|
};
|
||||||
|
|
||||||
|
// Register the function that will process messages
|
||||||
|
subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new method called `SendMessagesAsync` with the following code:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static async Task SendMessagesAsync(int numberOfMessagesToSend)
|
||||||
|
{
|
||||||
|
for (var i = 0; i < numberOfMessagesToSend; i++)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Create a new message to send to the topic
|
||||||
|
string messageBody = $"Message {i}";
|
||||||
|
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
|
||||||
|
|
||||||
|
// Write the body of the message to the console
|
||||||
|
Console.WriteLine($"Sending message: {messageBody}");
|
||||||
|
|
||||||
|
// Send the message to the topic
|
||||||
|
await topicClient.SendAsync(message);
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new method called `MainAsync` with the following code:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static async Task MainAsync(string[] args)
|
||||||
|
{
|
||||||
|
const int numberOfMessages = 10;
|
||||||
|
topicClient = new TopicClient(ServiceBusConnectionString, TopicName);
|
||||||
|
subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);
|
||||||
|
|
||||||
|
Console.WriteLine("======================================================");
|
||||||
|
Console.WriteLine("Press any key to exit after receiving all the messages.");
|
||||||
|
Console.WriteLine("======================================================");
|
||||||
|
|
||||||
|
// Register Subscription's MessageHandler and receive messages in a loop
|
||||||
|
RegisterOnMessageHandlerAndReceiveMessages();
|
||||||
|
|
||||||
|
// Send Messages
|
||||||
|
await SendMessagesAsync(numberOfMessages);
|
||||||
|
|
||||||
|
Console.ReadKey();
|
||||||
|
|
||||||
|
await subscriptionClient.CloseAsync();
|
||||||
|
await topicClient.CloseAsync();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Add the following code to the `Main` method:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
MainAsync(args).GetAwaiter().GetResult();
|
||||||
|
```
|
||||||
|
|
||||||
|
Congratulations! You have now sent messages to a ServiceBus Topic and received messages from a ServiceBus Subscription.
|
|
@ -0,0 +1,12 @@
|
||||||
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
|
||||||
|
<PropertyGroup>
|
||||||
|
<OutputType>Exe</OutputType>
|
||||||
|
<TargetFramework>netcoreapp2.0</TargetFramework>
|
||||||
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="2.0.0" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
</Project>
|
|
@ -0,0 +1,134 @@
|
||||||
|
// Copyright (c) Microsoft. All rights reserved.
|
||||||
|
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||||
|
|
||||||
|
namespace BasicSessionSendReceiveUsingQueueClient
|
||||||
|
{
|
||||||
|
using Microsoft.Azure.ServiceBus;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
class Program
|
||||||
|
{
|
||||||
|
// Connection String for the namespace can be obtained from the Azure portal under the
|
||||||
|
// 'Shared Access policies' section.
|
||||||
|
const string ServiceBusConnectionString = "{ServiceBus connection string}";
|
||||||
|
const string QueueName = "{Queue Name of a Queue that supports sessions}";
|
||||||
|
static IQueueClient queueClient;
|
||||||
|
|
||||||
|
static void Main(string[] args)
|
||||||
|
{
|
||||||
|
MainAsync().GetAwaiter().GetResult();
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task MainAsync()
|
||||||
|
{
|
||||||
|
const int numberOfSessions = 5;
|
||||||
|
const int numberOfMessagesPerSession = 3;
|
||||||
|
|
||||||
|
queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
|
||||||
|
|
||||||
|
Console.WriteLine("======================================================");
|
||||||
|
Console.WriteLine("Press any key to exit after receiving all the messages.");
|
||||||
|
Console.WriteLine("======================================================");
|
||||||
|
|
||||||
|
// Register Session Handler and Receive Session Messages
|
||||||
|
RegisterOnSessionHandlerAndReceiveSessionMessages();
|
||||||
|
|
||||||
|
// Send messages with sessionId set
|
||||||
|
await SendSessionMessagesAsync(numberOfSessions, numberOfMessagesPerSession);
|
||||||
|
|
||||||
|
Console.ReadKey();
|
||||||
|
|
||||||
|
await queueClient.CloseAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void RegisterOnSessionHandlerAndReceiveSessionMessages()
|
||||||
|
{
|
||||||
|
// Configure the SessionHandler Options in terms of exception handling, number of concurrent sessions to deliver etc.
|
||||||
|
var sessionHandlerOptions =
|
||||||
|
new SessionHandlerOptions(ExceptionReceivedHandler)
|
||||||
|
{
|
||||||
|
// Maximum number of Concurrent calls to the callback `ProcessSessionMessagesAsync`
|
||||||
|
// Value 2 below indicates the callback can be called with a message for 2 unique
|
||||||
|
// session Id's in parallel. Set it according to how many messages the application
|
||||||
|
// wants to process in parallel.
|
||||||
|
MaxConcurrentSessions = 2,
|
||||||
|
|
||||||
|
// Indicates the maximum time the Session Pump should wait for receiving messages for sessions.
|
||||||
|
// If no message is received within the specified time, the pump will close that session and try to get messages
|
||||||
|
// from a different session. Default is to wait for 1 minute to fetch messages for a session. Set to a 1 second
|
||||||
|
// value here to allow the sample execution to finish fast but ideally leave this as 1 minute unless there
|
||||||
|
// is a specific reason to timeout earlier.
|
||||||
|
MessageWaitTimeout = TimeSpan.FromSeconds(1),
|
||||||
|
|
||||||
|
// Indicates whether SessionPump should automatically complete the messages after returning from User Callback.
|
||||||
|
// False below indicates the Complete will be handled by the User Callback as in `ProcessSessionMessagesAsync`.
|
||||||
|
AutoComplete = false
|
||||||
|
};
|
||||||
|
|
||||||
|
// Register the function that will process session messages
|
||||||
|
queueClient.RegisterSessionHandler(ProcessSessionMessagesAsync, sessionHandlerOptions);
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task ProcessSessionMessagesAsync(IMessageSession session, Message message, CancellationToken token)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"Received Session: {session.SessionId} message: SequenceNumber: {message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
|
||||||
|
|
||||||
|
// Complete the message so that it is not received again.
|
||||||
|
// This can be done only if the queueClient is created in ReceiveMode.PeekLock mode (which is default).
|
||||||
|
await session.CompleteAsync(message.SystemProperties.LockToken);
|
||||||
|
|
||||||
|
// Note: Use the cancellationToken passed as necessary to determine if the queueClient has already been closed.
|
||||||
|
// If queueClient has already been Closed, you may chose to not call CompleteAsync() or AbandonAsync() etc. calls
|
||||||
|
// to avoid unnecessary exceptions.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use this Handler to look at the exceptions received on the SessionPump
|
||||||
|
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
|
||||||
|
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
|
||||||
|
Console.WriteLine("Exception context for troubleshooting:");
|
||||||
|
Console.WriteLine($"- Endpoint: {context.Endpoint}");
|
||||||
|
Console.WriteLine($"- Entity Path: {context.EntityPath}");
|
||||||
|
Console.WriteLine($"- Executing Action: {context.Action}");
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task SendSessionMessagesAsync(int numberOfSessions, int messagesPerSession)
|
||||||
|
{
|
||||||
|
const string SessionPrefix = "session";
|
||||||
|
|
||||||
|
if (numberOfSessions == 0 || messagesPerSession == 0)
|
||||||
|
{
|
||||||
|
await Task.FromResult(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < numberOfSessions; i++)
|
||||||
|
{
|
||||||
|
var messagesToSend = new List<Message>();
|
||||||
|
string sessionId = SessionPrefix + i;
|
||||||
|
for (int j = 0; j < messagesPerSession; j++)
|
||||||
|
{
|
||||||
|
// Create a new message to send to the queue
|
||||||
|
string messageBody = "test" + j;
|
||||||
|
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
|
||||||
|
// Assign a SessionId for the message
|
||||||
|
message.SessionId = sessionId;
|
||||||
|
messagesToSend.Add(message);
|
||||||
|
|
||||||
|
// Write the sessionId, body of the message to the console
|
||||||
|
Console.WriteLine($"Sending SessionId: {message.SessionId}, message: {messageBody}");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send a batch of messages corresponding to this sessionId to the queue
|
||||||
|
await queueClient.SendAsync(messagesToSend);
|
||||||
|
}
|
||||||
|
|
||||||
|
Console.WriteLine($"Sent {messagesPerSession} messages each for {numberOfSessions} sessions.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,201 @@
|
||||||
|
# Get started sending and receiving session based messages from Service Bus queues using QueueClient
|
||||||
|
|
||||||
|
In order to run the sample in this directory, replace the following bracketed values in the `Program.cs` file.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
// Connection String for the namespace can be obtained from the Azure portal under the
|
||||||
|
// `Shared Access policies` section.
|
||||||
|
const string ServiceBusConnectionString = "{Service Bus connection string}";
|
||||||
|
const string QueueName = "{Queue Name of a Queue that supports sessions}";
|
||||||
|
```
|
||||||
|
|
||||||
|
Once you replace the above values run the following from a command prompt:
|
||||||
|
|
||||||
|
```
|
||||||
|
dotnet restore
|
||||||
|
dotnet build
|
||||||
|
dotnet run
|
||||||
|
```
|
||||||
|
|
||||||
|
## The Sample Program
|
||||||
|
To keep things reasonably simple, the sample program keeps send and receive code within a single hosting application.
|
||||||
|
Typically in real world applications these roles are often spread across applications, services, or at least across
|
||||||
|
independently deployed and run tiers of applications or services. For clarity, the send and receive activities are kept as
|
||||||
|
separate methods as if they were different apps.
|
||||||
|
|
||||||
|
For further information on how to create this sample on your own, follow the rest of the tutorial.
|
||||||
|
|
||||||
|
## What will be accomplished
|
||||||
|
In this tutorial, we will write a console application to send and receive sessionful messages to a ServiceBus queue using a QueueClient.
|
||||||
|
Sessions are used in scenarios where User requires unbounded sequences of related messages. Messages within a session are always delivered
|
||||||
|
in a First In First Out Order. Sending session based messages to a queue using QueueClient is same as sending other messages but the
|
||||||
|
messages are stamped with an additional `SessionId` property. QueueClient offers a simple SessionPump model to receive messages related
|
||||||
|
to a session. Once a session handler is registered as shown below, the User code does not have to write explicit code to receive sessions
|
||||||
|
and if configured using `SessionHandlerOptions`, does not have to write explicit code to renew session locks or complete messages or improve
|
||||||
|
the degree of concurrency of session processing. Hence the queueClient can be used in scenarios where the User wants to get started quickly
|
||||||
|
or the scenarios where they need basic session based send/receive and wants to achieve that with as little code writing as possible.
|
||||||
|
|
||||||
|
## Prerequisites
|
||||||
|
1. [.NET Core](https://www.microsoft.com/net/core)
|
||||||
|
2. An Azure subscription.
|
||||||
|
3. [A ServiceBus namespace](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal)
|
||||||
|
4. [A ServiceBus queue](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues#2-create-a-queue-using-the-azure-portal)
|
||||||
|
|
||||||
|
### Create a console application
|
||||||
|
|
||||||
|
- Create a new .NET Core application. Check out [this link](https://docs.microsoft.com/en-us/dotnet/articles/core/getting-started) with help to create a new application on your operating system.
|
||||||
|
|
||||||
|
### Add the ServiceBus client reference
|
||||||
|
|
||||||
|
1. Add the following to your project.json, making sure that the solution references the `Microsoft.Azure.ServiceBus` project.
|
||||||
|
|
||||||
|
```json
|
||||||
|
"Microsoft.Azure.ServiceBus": "1.0.0"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Write some code to send and receive messages from the queue
|
||||||
|
1. Add the following using statement to the top of the Program.cs file.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
using Microsoft.Azure.ServiceBus;
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Add the following variables to the `Program` class, and replace the placeholder values:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
const string ServiceBusConnectionString = "{Service Bus connection string}";
|
||||||
|
const string QueueName = "{Queue Name of a Queue that supports sessions}";
|
||||||
|
static IQueueClient queueClient;
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new Task called `ProcessSessionMessagesAsync` that knows how to handle received messages from a session with the following code:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static async Task ProcessSessionMessagesAsync(IMessageSession session, Message message, CancellationToken token)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"Received Session: {session.SessionId} message: SequenceNumber: {message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
|
||||||
|
|
||||||
|
// Complete the message so that it is not received again.
|
||||||
|
// This can be done only if the queueClient is created in ReceiveMode.PeekLock mode (which is default).
|
||||||
|
await session.CompleteAsync(message.SystemProperties.LockToken);
|
||||||
|
|
||||||
|
// Note: Use the cancellationToken passed as necessary to determine if the queueClient has already been closed.
|
||||||
|
// If queueClient has already been Closed, you may chose to not call CompleteAsync() or AbandonAsync() etc. calls
|
||||||
|
// to avoid unnecessary exceptions.
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new Task called `ExceptionReceivedHandler` to look at the exceptions received on the MessagePump. This will be useful for debugging purposes.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
// Use this Handler to look at the exceptions received on the SessionPump
|
||||||
|
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new method called 'RegisterOnSessionHandlerAndReceiveSessionMessages' to register the `ProcessSessionMessagesAsync` and the
|
||||||
|
`ExceptionReceivedHandler` with the necessary `SessionHandlerOptions` parameters to start receiving messages from sessions
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static void RegisterOnSessionHandlerAndReceiveSessionMessages()
|
||||||
|
{
|
||||||
|
// Configure the SessionHandler Options in terms of exception handling, number of concurrent sessions to deliver etc.
|
||||||
|
var sessionHandlerOptions =
|
||||||
|
new SessionHandlerOptions(ExceptionReceivedHandler)
|
||||||
|
{
|
||||||
|
// Maximum number of Concurrent calls to the callback `ProcessSessionMessagesAsync`
|
||||||
|
// Value 2 below indicates the callback can be called with a message for 2 unique
|
||||||
|
// session Id's in parallel. Set it according to how many messages the application
|
||||||
|
// wants to process in parallel.
|
||||||
|
MaxConcurrentSessions = 2,
|
||||||
|
|
||||||
|
// Indicates the maximum time the Session Pump should wait for receiving messages for sessions.
|
||||||
|
// If no message is received within the specified time, the pump will close that session and try to get messages
|
||||||
|
// from a different session. Default is to wait for 1 minute to fetch messages for a session. Set to a 1 second
|
||||||
|
// value here to allow the sample execution to finish fast but ideally leave this as 1 minute unless there
|
||||||
|
// is a specific reason to timeout earlier.
|
||||||
|
MessageWaitTimeout = TimeSpan.FromSeconds(1),
|
||||||
|
|
||||||
|
// Indicates whether SessionPump should automatically complete the messages after returning from User Callback.
|
||||||
|
// False below indicates the Complete will be handled by the User Callback as in `ProcessSessionMessagesAsync`.
|
||||||
|
AutoComplete = false
|
||||||
|
};
|
||||||
|
|
||||||
|
// Register the function that will process session messages
|
||||||
|
queueClient.RegisterSessionHandler(ProcessSessionMessagesAsync, sessionHandlerOptions);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new method called `SendSessionMessagesAsync` that sends sessionful messages to the queue with the following code:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static async Task SendSessionMessagesAsync(int numberOfSessions, int messagesPerSession)
|
||||||
|
{
|
||||||
|
const string SessionPrefix = "session";
|
||||||
|
|
||||||
|
if (numberOfSessions == 0 || messagesPerSession == 0)
|
||||||
|
{
|
||||||
|
await Task.FromResult(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < numberOfSessions; i++)
|
||||||
|
{
|
||||||
|
var messagesToSend = new List<Message>();
|
||||||
|
string sessionId = SessionPrefix + i;
|
||||||
|
for (int j = 0; j < messagesPerSession; j++)
|
||||||
|
{
|
||||||
|
// Create a new message to send to the queue
|
||||||
|
string messageBody = "test" + j;
|
||||||
|
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
|
||||||
|
// Assign a SessionId for the message
|
||||||
|
message.SessionId = sessionId;
|
||||||
|
messagesToSend.Add(message);
|
||||||
|
|
||||||
|
// Write the sessionId, body of the message to the console
|
||||||
|
Console.WriteLine($"Sending SessionId: {message.SessionId}, message: {messageBody}");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send a batch of messages corresponding to this sessionId to the queue
|
||||||
|
await queueClient.SendAsync(messagesToSend);
|
||||||
|
}
|
||||||
|
|
||||||
|
Console.WriteLine($"Sent {messagesPerSession} messages each for {numberOfSessions} sessions.");
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new method called `MainAsync` with the following code:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static async Task MainAsync(string[] args)
|
||||||
|
{
|
||||||
|
const int numberOfSessions = 5;
|
||||||
|
const int numberOfMessagesPerSession = 3;
|
||||||
|
|
||||||
|
queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
|
||||||
|
|
||||||
|
Console.WriteLine("======================================================");
|
||||||
|
Console.WriteLine("Press any key to exit after receiving all the messages.");
|
||||||
|
Console.WriteLine("======================================================");
|
||||||
|
|
||||||
|
// Register Session Handler and Receive Session Messages
|
||||||
|
RegisterOnSessionHandlerAndReceiveSessionMessages();
|
||||||
|
|
||||||
|
// Send messages with sessionId set
|
||||||
|
await SendSessionMessagesAsync(numberOfSessions, numberOfMessagesPerSession);
|
||||||
|
|
||||||
|
Console.ReadKey();
|
||||||
|
|
||||||
|
await queueClient.CloseAsync();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Add the following code to the `Main` method:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
MainAsync(args).GetAwaiter().GetResult();
|
||||||
|
```
|
||||||
|
|
||||||
|
Congratulations! You have now sent and received session based messages to a ServiceBus queue, using QueueClient.
|
|
@ -0,0 +1,52 @@
|
||||||
|
|
||||||
|
Microsoft Visual Studio Solution File, Format Version 12.00
|
||||||
|
# Visual Studio 15
|
||||||
|
VisualStudioVersion = 15.0.26430.16
|
||||||
|
MinimumVisualStudioVersion = 10.0.40219.1
|
||||||
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BasicSendReceiveUsingQueueClient", "BasicSendReceiveUsingQueueClient\BasicSendReceiveUsingQueueClient.csproj", "{C7403021-8C99-4FD2-AD6D-943F1ABB439F}"
|
||||||
|
EndProject
|
||||||
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SendReceiveUsingMessageSenderReceiver", "SendReceiveUsingMessageSenderReceiver\SendReceiveUsingMessageSenderReceiver.csproj", "{45D687AA-081B-4D4E-82C3-58A2BD5F02A8}"
|
||||||
|
EndProject
|
||||||
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BasicSessionSendReceiveUsingQueueClient", "BasicSessionSendReceiveUsingQueueClient\BasicSessionSendReceiveUsingQueueClient.csproj", "{3F7B9F6A-0B84-4B4E-A68F-77D4E35EDD57}"
|
||||||
|
EndProject
|
||||||
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SessionSendReceiveUsingSessionClient", "SessionSendReceiveUsingSessionClient\SessionSendReceiveUsingSessionClient.csproj", "{C61629D8-2605-4554-8747-9503572E2B2D}"
|
||||||
|
EndProject
|
||||||
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BasicSendReceiveUsingTopicSubscriptionClient", "BasicSendReceiveUsingTopicSubscriptionClient\BasicSendReceiveUsingTopicSubscriptionClient.csproj", "{BAA95098-988E-48A1-B8B5-5367B7B2A667}"
|
||||||
|
EndProject
|
||||||
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TopicSubscriptionWithRuleOperationsSample", "TopicSubscriptionWithRuleOperationsSample\TopicSubscriptionWithRuleOperationsSample.csproj", "{BCFACFBC-0DFE-4444-8682-AEFA98D1E398}"
|
||||||
|
EndProject
|
||||||
|
Global
|
||||||
|
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||||
|
Debug|Any CPU = Debug|Any CPU
|
||||||
|
Release|Any CPU = Release|Any CPU
|
||||||
|
EndGlobalSection
|
||||||
|
GlobalSection(ProjectConfigurationPlatforms) = postSolution
|
||||||
|
{C7403021-8C99-4FD2-AD6D-943F1ABB439F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{C7403021-8C99-4FD2-AD6D-943F1ABB439F}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{C7403021-8C99-4FD2-AD6D-943F1ABB439F}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{C7403021-8C99-4FD2-AD6D-943F1ABB439F}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
{45D687AA-081B-4D4E-82C3-58A2BD5F02A8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{45D687AA-081B-4D4E-82C3-58A2BD5F02A8}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{45D687AA-081B-4D4E-82C3-58A2BD5F02A8}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{45D687AA-081B-4D4E-82C3-58A2BD5F02A8}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
{3F7B9F6A-0B84-4B4E-A68F-77D4E35EDD57}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{3F7B9F6A-0B84-4B4E-A68F-77D4E35EDD57}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{3F7B9F6A-0B84-4B4E-A68F-77D4E35EDD57}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{3F7B9F6A-0B84-4B4E-A68F-77D4E35EDD57}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
{C61629D8-2605-4554-8747-9503572E2B2D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{C61629D8-2605-4554-8747-9503572E2B2D}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{C61629D8-2605-4554-8747-9503572E2B2D}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{C61629D8-2605-4554-8747-9503572E2B2D}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
{BAA95098-988E-48A1-B8B5-5367B7B2A667}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{BAA95098-988E-48A1-B8B5-5367B7B2A667}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{BAA95098-988E-48A1-B8B5-5367B7B2A667}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{BAA95098-988E-48A1-B8B5-5367B7B2A667}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
{BCFACFBC-0DFE-4444-8682-AEFA98D1E398}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{BCFACFBC-0DFE-4444-8682-AEFA98D1E398}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{BCFACFBC-0DFE-4444-8682-AEFA98D1E398}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{BCFACFBC-0DFE-4444-8682-AEFA98D1E398}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
EndGlobalSection
|
||||||
|
GlobalSection(SolutionProperties) = preSolution
|
||||||
|
HideSolutionNode = FALSE
|
||||||
|
EndGlobalSection
|
||||||
|
EndGlobal
|
|
@ -0,0 +1,87 @@
|
||||||
|
// Copyright (c) Microsoft. All rights reserved.
|
||||||
|
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||||
|
|
||||||
|
namespace SendReceiveUsingMessageSenderReceiver
|
||||||
|
{
|
||||||
|
using Microsoft.Azure.ServiceBus;
|
||||||
|
using Microsoft.Azure.ServiceBus.Core;
|
||||||
|
using System;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
class Program
|
||||||
|
{
|
||||||
|
// Connection String for the namespace can be obtained from the Azure portal under the
|
||||||
|
// 'Shared Access policies' section.
|
||||||
|
const string ServiceBusConnectionString = "{ServiceBus connection string}";
|
||||||
|
const string QueueName = "{Queue Name}";
|
||||||
|
static IMessageSender messageSender;
|
||||||
|
static IMessageReceiver messageReceiver;
|
||||||
|
|
||||||
|
static void Main(string[] args)
|
||||||
|
{
|
||||||
|
MainAsync().GetAwaiter().GetResult();
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task MainAsync()
|
||||||
|
{
|
||||||
|
const int numberOfMessages = 10;
|
||||||
|
messageSender = new MessageSender(ServiceBusConnectionString, QueueName);
|
||||||
|
messageReceiver = new MessageReceiver(ServiceBusConnectionString, QueueName, ReceiveMode.PeekLock);
|
||||||
|
|
||||||
|
// Send Messages
|
||||||
|
await SendMessagesAsync(numberOfMessages);
|
||||||
|
|
||||||
|
// Receive Messages
|
||||||
|
await ReceiveMessagesAsync(numberOfMessages);
|
||||||
|
|
||||||
|
Console.WriteLine("=========================================================");
|
||||||
|
Console.WriteLine("Completed Receiving all messages... Press any key to exit");
|
||||||
|
Console.WriteLine("=========================================================");
|
||||||
|
|
||||||
|
Console.ReadKey();
|
||||||
|
|
||||||
|
await messageSender.CloseAsync();
|
||||||
|
await messageReceiver.CloseAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task ReceiveMessagesAsync(int numberOfMessagesToReceive)
|
||||||
|
{
|
||||||
|
while(numberOfMessagesToReceive-- > 0)
|
||||||
|
{
|
||||||
|
// Receive the message
|
||||||
|
Message message = await messageReceiver.ReceiveAsync();
|
||||||
|
|
||||||
|
// Process the message
|
||||||
|
Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
|
||||||
|
|
||||||
|
// Complete the message so that it is not received again.
|
||||||
|
// This can be done only if the MessageReceiver is created in ReceiveMode.PeekLock mode (which is default).
|
||||||
|
await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task SendMessagesAsync(int numberOfMessagesToSend)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
for (var i = 0; i < numberOfMessagesToSend; i++)
|
||||||
|
{
|
||||||
|
// Create a new message to send to the queue
|
||||||
|
string messageBody = $"Message {i}";
|
||||||
|
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
|
||||||
|
|
||||||
|
// Write the body of the message to the console
|
||||||
|
Console.WriteLine($"Sending message: {messageBody}");
|
||||||
|
|
||||||
|
// Send the message to the queue
|
||||||
|
await messageSender.SendAsync(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
|
||||||
|
<PropertyGroup>
|
||||||
|
<OutputType>Exe</OutputType>
|
||||||
|
<TargetFramework>netcoreapp2.0</TargetFramework>
|
||||||
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="2.0.0" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
</Project>
|
|
@ -0,0 +1,144 @@
|
||||||
|
# Get started sending and receiving messages from Service Bus queues using MessageSender and MessageReceiver
|
||||||
|
|
||||||
|
In order to run the sample in this directory, replace the following bracketed values in the `Program.cs` file.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
// Connection String for the namespace can be obtained from the Azure portal under the
|
||||||
|
// `Shared Access policies` section.
|
||||||
|
const string ServiceBusConnectionString = "{Service Bus connection string}";
|
||||||
|
const string QueueName = "{Queue Name}";
|
||||||
|
```
|
||||||
|
|
||||||
|
Once you replace the above values run the following from a command prompt:
|
||||||
|
|
||||||
|
```
|
||||||
|
dotnet restore
|
||||||
|
dotnet build
|
||||||
|
dotnet run
|
||||||
|
```
|
||||||
|
|
||||||
|
## The Sample Program
|
||||||
|
To keep things reasonably simple, the sample program keeps send and receive code within a single hosting application.
|
||||||
|
Typically in real world applications these roles are often spread across applications, services, or at least across
|
||||||
|
independently deployed and run tiers of applications or services. For clarity, the send and receive activities are kept as
|
||||||
|
separate methods as if they were different apps.
|
||||||
|
|
||||||
|
For further information on how to create this sample on your own, follow the rest of the tutorial.
|
||||||
|
|
||||||
|
## What will be accomplished
|
||||||
|
In this tutorial, we will write a console application to send and receive messages to a ServiceBus queue using MessageSender and MessageReceiver.
|
||||||
|
MessageSender and MessageReceiver APIs offer a more richer API surface in terms of being able to Defer Messages, Receive Deferred Messages,
|
||||||
|
Peek Messages etc. Thus it allows the User a more granular control on processing of messages than `QueueClient` but that also means the User has
|
||||||
|
to write more code to renew message locks, complete messages and define how to achieve a basic degree of concurrency while processing messages.
|
||||||
|
|
||||||
|
## Prerequisites
|
||||||
|
1. [.NET Core](https://www.microsoft.com/net/core)
|
||||||
|
2. An Azure subscription.
|
||||||
|
3. [A ServiceBus namespace](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal)
|
||||||
|
4. [A ServiceBus queue](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues#2-create-a-queue-using-the-azure-portal)
|
||||||
|
|
||||||
|
### Create a console application
|
||||||
|
|
||||||
|
- Create a new .NET Core application. Check out [this link](https://docs.microsoft.com/en-us/dotnet/articles/core/getting-started) with help to create a new application on your operating system.
|
||||||
|
|
||||||
|
### Add the ServiceBus client reference
|
||||||
|
|
||||||
|
1. Add the following to your project.json, making sure that the solution references the `Microsoft.Azure.ServiceBus` project.
|
||||||
|
|
||||||
|
```json
|
||||||
|
"Microsoft.Azure.ServiceBus": "1.0.0"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Write some code to send and receive messages from the queue
|
||||||
|
1. Add the following using statement to the top of the Program.cs file.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
using Microsoft.Azure.ServiceBus;
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Add the following variables to the `Program` class, and replace the placeholder values:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
const string ServiceBusConnectionString = "{Service Bus connection string}";
|
||||||
|
const string QueueName = "{Queue Name}";
|
||||||
|
static IMessageSender messageSender;
|
||||||
|
static IMessageReceiver messageReceiver;
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new Task called `ReceiveMessagesAsync` that knows how to handle received messages with the following code:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static async Task ReceiveMessagesAsync(int numberOfMessagesToReceive)
|
||||||
|
{
|
||||||
|
while(numberOfMessagesToReceive-- > 0)
|
||||||
|
{
|
||||||
|
// Receive the message
|
||||||
|
Message message = await messageReceiver.ReceiveAsync();
|
||||||
|
|
||||||
|
// Process the message
|
||||||
|
Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
|
||||||
|
|
||||||
|
// Complete the message so that it is not received again.
|
||||||
|
// This can be done only if the MessageReceiver is created in ReceiveMode.PeekLock mode (which is default).
|
||||||
|
await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
1. Create a new method called `SendMessagesToQueue` with the following code:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
// Sends messages to the queue.
|
||||||
|
static async Task SendMessagesAsync(int numberOfMessagesToSend)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
for (var i = 0; i < numberOfMessagesToSend; i++)
|
||||||
|
{
|
||||||
|
// Create a new message to send to the queue
|
||||||
|
string messageBody = $"Message {i}";
|
||||||
|
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
|
||||||
|
|
||||||
|
// Write the body of the message to the console
|
||||||
|
Console.WriteLine($"Sending message: {messageBody}");
|
||||||
|
|
||||||
|
// Send the message to the queue
|
||||||
|
await messageSender.SendAsync(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new method called `MainAsync` with the following code:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static async Task MainAsync(string[] args)
|
||||||
|
{
|
||||||
|
messageSender = new MessageSender(ServiceBusConnectionString, QueueName);
|
||||||
|
messageReceiver = new MessageReceiver(ServiceBusConnectionString, QueueName, ReceiveMode.PeekLock);
|
||||||
|
|
||||||
|
// Send Messages
|
||||||
|
await SendMessagesToQueue(numberOfMessages);
|
||||||
|
|
||||||
|
// Receive Messages
|
||||||
|
await ReceiveMessagesAsync(numberOfMessages);
|
||||||
|
|
||||||
|
Console.WriteLine("Completed Receiving all messages... Press any key to exit");
|
||||||
|
Console.ReadKey();
|
||||||
|
|
||||||
|
// Close the messageSender and messageReceiver after processing all needed messages.
|
||||||
|
await messageSender.CloseAsync();
|
||||||
|
await messageReceiver.CloseAsync();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Add the following code to the `Main` method:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
MainAsync(args).GetAwaiter().GetResult();
|
||||||
|
```
|
||||||
|
|
||||||
|
Congratulations! You have now sent and received messages to a ServiceBus queue using MessageSender and MessageReceiver.
|
|
@ -0,0 +1,125 @@
|
||||||
|
// Copyright (c) Microsoft. All rights reserved.
|
||||||
|
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||||
|
|
||||||
|
namespace SessionSendReceiveUsingSessionClient
|
||||||
|
{
|
||||||
|
using Microsoft.Azure.ServiceBus;
|
||||||
|
using Microsoft.Azure.ServiceBus.Core;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
class Program
|
||||||
|
{
|
||||||
|
// Connection String for the namespace can be obtained from the Azure portal under the
|
||||||
|
// 'Shared Access policies' section.
|
||||||
|
const string ServiceBusConnectionString = "{ServiceBus connection string}";
|
||||||
|
const string QueueName = "{Queue Name of a Queue that supports sessions}";
|
||||||
|
static IMessageSender messageSender;
|
||||||
|
static ISessionClient sessionClient;
|
||||||
|
const string SessionPrefix = "session-prefix";
|
||||||
|
|
||||||
|
static void Main(string[] args)
|
||||||
|
{
|
||||||
|
MainAsync().GetAwaiter().GetResult();
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task MainAsync()
|
||||||
|
{
|
||||||
|
const int numberOfSessions = 5;
|
||||||
|
const int numberOfMessagesPerSession = 3;
|
||||||
|
|
||||||
|
messageSender = new MessageSender(ServiceBusConnectionString, QueueName);
|
||||||
|
sessionClient = new SessionClient(ServiceBusConnectionString, QueueName);
|
||||||
|
|
||||||
|
// Send messages with sessionId set
|
||||||
|
await SendSessionMessagesAsync(numberOfSessions, numberOfMessagesPerSession);
|
||||||
|
|
||||||
|
// Receive all Session based messages using SessionClient
|
||||||
|
await ReceiveSessionMessagesAsync(numberOfSessions, numberOfMessagesPerSession);
|
||||||
|
|
||||||
|
Console.WriteLine("=========================================================");
|
||||||
|
Console.WriteLine("Completed Receiving all messages... Press any key to exit");
|
||||||
|
Console.WriteLine("=========================================================");
|
||||||
|
|
||||||
|
Console.ReadKey();
|
||||||
|
|
||||||
|
await messageSender.CloseAsync();
|
||||||
|
await sessionClient.CloseAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task ReceiveSessionMessagesAsync(int numberOfSessions, int messagesPerSession)
|
||||||
|
{
|
||||||
|
Console.WriteLine("===================================================================");
|
||||||
|
Console.WriteLine("Accepting sessions in the reverse order of sends for demo purposes");
|
||||||
|
Console.WriteLine("===================================================================");
|
||||||
|
|
||||||
|
for (int i = 0; i < numberOfSessions; i++)
|
||||||
|
{
|
||||||
|
int messagesReceivedPerSession = 0;
|
||||||
|
|
||||||
|
// AcceptMessageSessionAsync(i.ToString()) as below with session id as parameter will try to get a session with that sessionId.
|
||||||
|
// AcceptMessageSessionAsync() without any messages will try to get any available session with messages associated with that session.
|
||||||
|
IMessageSession session = await sessionClient.AcceptMessageSessionAsync(SessionPrefix + i.ToString());
|
||||||
|
|
||||||
|
if(session != null)
|
||||||
|
{
|
||||||
|
// Messages within a session will always arrive in order.
|
||||||
|
Console.WriteLine("=====================================");
|
||||||
|
Console.WriteLine($"Received Session: {session.SessionId}");
|
||||||
|
|
||||||
|
while (messagesReceivedPerSession++ < messagesPerSession)
|
||||||
|
{
|
||||||
|
Message message = await session.ReceiveAsync();
|
||||||
|
|
||||||
|
Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
|
||||||
|
|
||||||
|
// Complete the message so that it is not received again.
|
||||||
|
// This can be done only if the queueClient is created in ReceiveMode.PeekLock mode (which is default).
|
||||||
|
await session.CompleteAsync(message.SystemProperties.LockToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
Console.WriteLine($"Received all messages for Session: {session.SessionId}");
|
||||||
|
Console.WriteLine("=====================================");
|
||||||
|
|
||||||
|
// Close the Session after receiving all messages from the session
|
||||||
|
await session.CloseAsync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task SendSessionMessagesAsync(int numberOfSessions, int messagesPerSession)
|
||||||
|
{
|
||||||
|
if (numberOfSessions == 0 || messagesPerSession == 0)
|
||||||
|
{
|
||||||
|
await Task.FromResult(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = numberOfSessions - 1; i >= 0; i--)
|
||||||
|
{
|
||||||
|
var messagesToSend = new List<Message>();
|
||||||
|
string sessionId = SessionPrefix + i;
|
||||||
|
for (int j = 0; j < messagesPerSession; j++)
|
||||||
|
{
|
||||||
|
// Create a new message to send to the queue
|
||||||
|
string messageBody = "test" + j;
|
||||||
|
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
|
||||||
|
// Assign a SessionId for the message
|
||||||
|
message.SessionId = sessionId;
|
||||||
|
messagesToSend.Add(message);
|
||||||
|
|
||||||
|
// Write the sessionId, body of the message to the console
|
||||||
|
Console.WriteLine($"Sending SessionId: {message.SessionId}, message: {messageBody}");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send a batch of messages corresponding to this sessionId to the queue
|
||||||
|
await messageSender.SendAsync(messagesToSend);
|
||||||
|
}
|
||||||
|
|
||||||
|
Console.WriteLine("=====================================");
|
||||||
|
Console.WriteLine($"Sent {messagesPerSession} messages each for {numberOfSessions} sessions.");
|
||||||
|
Console.WriteLine("=====================================");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
|
||||||
|
<PropertyGroup>
|
||||||
|
<OutputType>Exe</OutputType>
|
||||||
|
<TargetFramework>netcoreapp2.0</TargetFramework>
|
||||||
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="2.0.0" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
</Project>
|
|
@ -0,0 +1,186 @@
|
||||||
|
# Get started sending and receiving session based messages from Service Bus queues using SessionClient
|
||||||
|
|
||||||
|
In order to run the sample in this directory, replace the following bracketed values in the `Program.cs` file.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
// Connection String for the namespace can be obtained from the Azure portal under the
|
||||||
|
// `Shared Access policies` section.
|
||||||
|
const string ServiceBusConnectionString = "{Service Bus connection string}";
|
||||||
|
const string QueueName = "{Queue Name of a Queue that supports sessions}";
|
||||||
|
```
|
||||||
|
|
||||||
|
Once you replace the above values run the following from a command prompt:
|
||||||
|
|
||||||
|
```
|
||||||
|
dotnet restore
|
||||||
|
dotnet build
|
||||||
|
dotnet run
|
||||||
|
```
|
||||||
|
|
||||||
|
## The Sample Program
|
||||||
|
To keep things reasonably simple, the sample program keeps send and receive code within a single hosting application.
|
||||||
|
Typically in real world applications these roles are often spread across applications, services, or at least across
|
||||||
|
independently deployed and run tiers of applications or services. For clarity, the send and receive activities are kept as
|
||||||
|
separate methods as if they were different apps.
|
||||||
|
|
||||||
|
For further information on how to create this sample on your own, follow the rest of the tutorial.
|
||||||
|
|
||||||
|
## What will be accomplished
|
||||||
|
In this tutorial, we will write a console application to send and receive sessionful messages to a ServiceBus queue using a SessionClient.
|
||||||
|
Sending session based messages to a queue using MessageSender is same as sending other messages but the messages are stamped with an additional
|
||||||
|
`SessionId` property. SessionClient offers a more granular control to the user for receiving Session based messages than `QueueClient`. The User
|
||||||
|
can explicitly choose to accept sessions with a particular `SessionId`, defer messages received from a session and accept deffered messages on that session.
|
||||||
|
But this also means the User has to write more code to accept MessageSessions, renew session locks, complete messages and
|
||||||
|
define how to achieve a basic degree of concurrency while processing sessions.
|
||||||
|
|
||||||
|
## Prerequisites
|
||||||
|
1. [.NET Core](https://www.microsoft.com/net/core)
|
||||||
|
2. An Azure subscription.
|
||||||
|
3. [A ServiceBus namespace](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal)
|
||||||
|
4. [A ServiceBus queue](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues#2-create-a-queue-using-the-azure-portal)
|
||||||
|
|
||||||
|
### Create a console application
|
||||||
|
|
||||||
|
- Create a new .NET Core application. Check out [this link](https://docs.microsoft.com/en-us/dotnet/articles/core/getting-started) with help to create a new application on your operating system.
|
||||||
|
|
||||||
|
### Add the ServiceBus client reference
|
||||||
|
|
||||||
|
1. Add the following to your project.json, making sure that the solution references the `Microsoft.Azure.ServiceBus` project.
|
||||||
|
|
||||||
|
```json
|
||||||
|
"Microsoft.Azure.ServiceBus": "1.0.0"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Write some code to send and receive messages from the queue
|
||||||
|
1. Add the following using statement to the top of the Program.cs file.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
using Microsoft.Azure.ServiceBus;
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Add the following variables to the `Program` class, and replace the placeholder values:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
const string ServiceBusConnectionString = "{Service Bus connection string}";
|
||||||
|
const string QueueName = "{Queue Name of a Queue that supports sessions}";
|
||||||
|
static IMessageSender messageSender;
|
||||||
|
static ISessionClient sessionClient;
|
||||||
|
const string SessionPrefix = "session-prefix";
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new Task called `ReceiveSessionMessagesAsync` that knows how to receive messages from a session using SessionClient with the following code:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static async Task ReceiveSessionMessagesAsync(int numberOfSessions, int messagesPerSession)
|
||||||
|
{
|
||||||
|
Console.WriteLine("===================================================================");
|
||||||
|
Console.WriteLine("Accepting sessions in the reverse order of sends for demo purposes");
|
||||||
|
Console.WriteLine("===================================================================");
|
||||||
|
|
||||||
|
for (int i = 0; i < numberOfSessions; i++)
|
||||||
|
{
|
||||||
|
int messagesReceivedPerSession = 0;
|
||||||
|
|
||||||
|
// AcceptMessageSessionAsync(i.ToString()) as below with session id as parameter will try to get a session with that sessionId.
|
||||||
|
// AcceptMessageSessionAsync() without any messages will try to get any available session with messages associated with that session.
|
||||||
|
IMessageSession session = await sessionClient.AcceptMessageSessionAsync(SessionPrefix + i.ToString());
|
||||||
|
|
||||||
|
if(session != null)
|
||||||
|
{
|
||||||
|
// Messages within a session will always arrive in order.
|
||||||
|
Console.WriteLine("=====================================");
|
||||||
|
Console.WriteLine($"Received Session: {session.SessionId}");
|
||||||
|
Console.WriteLine($"Receiving all messages for this Session");
|
||||||
|
|
||||||
|
while(messagesReceivedPerSession++ < messagesPerSession)
|
||||||
|
{
|
||||||
|
Message message = await session.ReceiveAsync();
|
||||||
|
Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
|
||||||
|
|
||||||
|
// Complete the message so that it is not received again.
|
||||||
|
// This can be done only if the queueClient is created in ReceiveMode.PeekLock mode (which is default).
|
||||||
|
await session.CompleteAsync(message.SystemProperties.LockToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
Console.WriteLine($"Received all messages for Session: {session.SessionId}");
|
||||||
|
Console.WriteLine("=====================================");
|
||||||
|
|
||||||
|
// Close the Session after receiving all messages from the session
|
||||||
|
await session.CloseAsync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new method called `SendSessionMessagesAsync` that sends sessionful messages to the queue with the following code:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static async Task SendSessionMessagesAsync(int numberOfSessions, int messagesPerSession)
|
||||||
|
{
|
||||||
|
if (numberOfSessions == 0 || messagesPerSession == 0)
|
||||||
|
{
|
||||||
|
await Task.FromResult(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = numberOfSessions - 1; i >= 0; i--)
|
||||||
|
{
|
||||||
|
var messagesToSend = new List<Message>();
|
||||||
|
string sessionId = SessionPrefix + i;
|
||||||
|
for (int j = 0; j < messagesPerSession; j++)
|
||||||
|
{
|
||||||
|
// Create a new message to send to the queue
|
||||||
|
string messageBody = "test" + j;
|
||||||
|
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
|
||||||
|
// Assign a SessionId for the message
|
||||||
|
message.SessionId = sessionId;
|
||||||
|
messagesToSend.Add(message);
|
||||||
|
|
||||||
|
// Write the sessionId, body of the message to the console
|
||||||
|
Console.WriteLine($"Sending SessionId: {message.SessionId}, message: {messageBody}");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send a batch of messages corresponding to this sessionId to the queue
|
||||||
|
await queueClient.SendAsync(messagesToSend);
|
||||||
|
}
|
||||||
|
|
||||||
|
Console.WriteLine("=====================================");
|
||||||
|
Console.WriteLine($"Sent {messagesPerSession} messages each for {numberOfSessions} sessions.");
|
||||||
|
Console.WriteLine("=====================================");
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new method called `MainAsync` with the following code:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static async Task MainAsync(string[] args)
|
||||||
|
{
|
||||||
|
const int numberOfSessions = 5;
|
||||||
|
const int numberOfMessagesPerSession = 3;
|
||||||
|
|
||||||
|
messageSender = new MessageSender(ServiceBusConnectionString, QueueName);
|
||||||
|
sessionClient = new SessionClient(ServiceBusConnectionString, QueueName);
|
||||||
|
|
||||||
|
Console.WriteLine("======================================================");
|
||||||
|
Console.WriteLine("Press any key to exit after receiving all the messages.");
|
||||||
|
Console.WriteLine("======================================================");
|
||||||
|
|
||||||
|
// Send messages with sessionId set
|
||||||
|
await SendSessionMessagesAsync(numberOfSessions, numberOfMessagesPerSession);
|
||||||
|
|
||||||
|
// Receive all Session based messages using SessionClient
|
||||||
|
await ReceiveSessionMessagesAsync(numberOfSessions, numberOfMessagesPerSession);
|
||||||
|
|
||||||
|
Console.ReadKey();
|
||||||
|
|
||||||
|
await messageSender.CloseAsync();
|
||||||
|
await sessionClient.CloseAsync();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Add the following code to the `Main` method:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
MainAsync(args).GetAwaiter().GetResult();
|
||||||
|
```
|
||||||
|
|
||||||
|
Congratulations! You have now sent and received session based messages to a ServiceBus queue, using SessionClient.
|
|
@ -0,0 +1,183 @@
|
||||||
|
// Copyright (c) Microsoft. All rights reserved.
|
||||||
|
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||||
|
|
||||||
|
namespace TopicSubscriptionWithRuleOperationsSample
|
||||||
|
{
|
||||||
|
using Microsoft.Azure.ServiceBus;
|
||||||
|
using Microsoft.Azure.ServiceBus.Core;
|
||||||
|
using System;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
class Program
|
||||||
|
{
|
||||||
|
// Connection String for the namespace can be obtained from the Azure portal under the
|
||||||
|
// 'Shared Access policies' section.
|
||||||
|
const string ServiceBusConnectionString = "{ServiceBus connection string}";
|
||||||
|
const string TopicName = "{Topic Name}";
|
||||||
|
|
||||||
|
// Simply create 4 default subscriptions (no rules specified explicitly) and provide subscription names.
|
||||||
|
// The Rule addition will be done as part of the sample depending on the subscription behavior expected.
|
||||||
|
const string allMessagesSubscriptionName = "{Subscription 1 Name}";
|
||||||
|
const string sqlFilterOnlySubscriptionName = "{Subscription 2 Name}";
|
||||||
|
const string sqlFilterWithActionSubscriptionName = "{Subscription 3 Name}";
|
||||||
|
const string correlationFilterSubscriptionName = "{Subscription 4 Name}";
|
||||||
|
|
||||||
|
static ITopicClient topicClient;
|
||||||
|
static ISubscriptionClient allMessagessubscriptionClient, sqlFilterOnlySubscriptionClient, sqlFilterWithActionSubscriptionClient, correlationFilterSubscriptionClient;
|
||||||
|
|
||||||
|
static void Main(string[] args)
|
||||||
|
{
|
||||||
|
MainAsync().GetAwaiter().GetResult();
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task MainAsync()
|
||||||
|
{
|
||||||
|
topicClient = new TopicClient(ServiceBusConnectionString, TopicName);
|
||||||
|
allMessagessubscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, allMessagesSubscriptionName);
|
||||||
|
sqlFilterOnlySubscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, sqlFilterOnlySubscriptionName);
|
||||||
|
sqlFilterWithActionSubscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, sqlFilterWithActionSubscriptionName);
|
||||||
|
correlationFilterSubscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, correlationFilterSubscriptionName);
|
||||||
|
|
||||||
|
// First Subscription is already created with default rule. Leave as is.
|
||||||
|
|
||||||
|
// 2nd Subscription: Add SqlFilter on Subscription 2
|
||||||
|
// Delete Default Rule.
|
||||||
|
// Add the required SqlFilter Rule
|
||||||
|
// Note: Does not apply to this sample but if there are multiple rules configured for a
|
||||||
|
// single subscription, then one message is delivered to the subscription when any of the
|
||||||
|
// rule matches. If more than one rules match and if there is no `SqlRuleAction` set for the
|
||||||
|
// rule, then only one message will be delivered to the subscription. If more than one rules
|
||||||
|
// match and there is a `SqlRuleAction` specified for the rule, then one message per `SqlRuleAction`
|
||||||
|
// is delivered to the subscription.
|
||||||
|
Console.WriteLine($"SubscriptionName: {sqlFilterOnlySubscriptionName}, Removing Default Rule and Adding SqlFilter");
|
||||||
|
await sqlFilterOnlySubscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName);
|
||||||
|
await sqlFilterOnlySubscriptionClient.AddRuleAsync(new RuleDescription
|
||||||
|
{
|
||||||
|
Filter = new SqlFilter("Color = 'Red'"),
|
||||||
|
Name = "RedSqlRule"
|
||||||
|
});
|
||||||
|
|
||||||
|
// 3rd Subscription: Add SqlFilter and SqlRuleAction on Subscription 3
|
||||||
|
// Delete Default Rule
|
||||||
|
// Add the required SqlFilter Rule and Action
|
||||||
|
Console.WriteLine($"SubscriptionName: {sqlFilterWithActionSubscriptionName}, Removing Default Rule and Adding SqlFilter and SqlRuleAction");
|
||||||
|
await sqlFilterWithActionSubscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName);
|
||||||
|
await sqlFilterWithActionSubscriptionClient.AddRuleAsync(new RuleDescription
|
||||||
|
{
|
||||||
|
Filter = new SqlFilter("Color = 'Blue'"),
|
||||||
|
Action = new SqlRuleAction("SET Color = 'BlueProcessed'"),
|
||||||
|
Name = "BlueSqlRule"
|
||||||
|
});
|
||||||
|
|
||||||
|
// 4th Subscription: Add Correlation Filter on Subscription 4
|
||||||
|
Console.WriteLine($"SubscriptionName: {sqlFilterWithActionSubscriptionName}, Removing Default Rule and Adding CorrelationFilter");
|
||||||
|
await correlationFilterSubscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName);
|
||||||
|
await correlationFilterSubscriptionClient.AddRuleAsync(new RuleDescription
|
||||||
|
{
|
||||||
|
Filter = new CorrelationFilter() { Label = "Red", CorrelationId = "important" },
|
||||||
|
Name = "ImportantCorrelationRule"
|
||||||
|
});
|
||||||
|
|
||||||
|
// Get Rules on Subscription, called here only for one subscription as example
|
||||||
|
var rules = (await correlationFilterSubscriptionClient.GetRulesAsync()).ToList();
|
||||||
|
Console.WriteLine($"GetRules:: SubscriptionName: {correlationFilterSubscriptionName}, CorrelationFilter Name: {rules[0].Name}, Rule: {rules[0].Filter}");
|
||||||
|
|
||||||
|
// Send messages to Topic
|
||||||
|
await SendMessagesAsync();
|
||||||
|
|
||||||
|
// Receive messages from 'allMessagesSubscriptionName'. Should receive all 9 messages
|
||||||
|
await ReceiveMessagesAsync(allMessagesSubscriptionName);
|
||||||
|
|
||||||
|
// Receive messages from 'sqlFilterOnlySubscriptionName'. Should receive all messages with Color = 'Red' i.e 3 messages
|
||||||
|
await ReceiveMessagesAsync(sqlFilterOnlySubscriptionName);
|
||||||
|
|
||||||
|
// Receive messages from 'sqlFilterWithActionSubscriptionClient'. Should receive all messages with Color = 'Blue'
|
||||||
|
// i.e 3 messages AND all messages should have color set to 'BlueProcessed'
|
||||||
|
await ReceiveMessagesAsync(sqlFilterWithActionSubscriptionName);
|
||||||
|
|
||||||
|
// Receive messages from 'correlationFilterSubscriptionName'. Should receive all messages with Color = 'Red' and CorrelationId = "important"
|
||||||
|
// i.e 1 message
|
||||||
|
await ReceiveMessagesAsync(correlationFilterSubscriptionName);
|
||||||
|
|
||||||
|
Console.WriteLine("=========================================================");
|
||||||
|
Console.WriteLine("Completed Receiving all messages... Press any key to exit");
|
||||||
|
Console.WriteLine("=========================================================");
|
||||||
|
|
||||||
|
Console.ReadKey();
|
||||||
|
|
||||||
|
await allMessagessubscriptionClient.CloseAsync();
|
||||||
|
await sqlFilterOnlySubscriptionClient.CloseAsync();
|
||||||
|
await sqlFilterWithActionSubscriptionClient.CloseAsync();
|
||||||
|
await correlationFilterSubscriptionClient.CloseAsync();
|
||||||
|
await topicClient.CloseAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task SendMessagesAsync()
|
||||||
|
{
|
||||||
|
Console.WriteLine($"==========================================================================");
|
||||||
|
Console.WriteLine("Sending Messages to Topic");
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await Task.WhenAll(
|
||||||
|
SendMessageAsync(label: "Red"),
|
||||||
|
SendMessageAsync(label: "Blue"),
|
||||||
|
SendMessageAsync(label: "Red", correlationId: "important"),
|
||||||
|
SendMessageAsync(label: "Blue", correlationId: "important"),
|
||||||
|
SendMessageAsync(label: "Red", correlationId: "notimportant"),
|
||||||
|
SendMessageAsync(label: "Blue", correlationId: "notimportant"),
|
||||||
|
SendMessageAsync(label: "Green"),
|
||||||
|
SendMessageAsync(label: "Green", correlationId: "important"),
|
||||||
|
SendMessageAsync(label: "Green", correlationId: "notimportant")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task SendMessageAsync(string label, string correlationId = null)
|
||||||
|
{
|
||||||
|
Message message = new Message { Label = label };
|
||||||
|
message.UserProperties.Add("Color", label);
|
||||||
|
|
||||||
|
if (correlationId != null)
|
||||||
|
{
|
||||||
|
message.CorrelationId = correlationId;
|
||||||
|
}
|
||||||
|
|
||||||
|
await topicClient.SendAsync(message);
|
||||||
|
Console.WriteLine($"Sent Message:: Label: {message.Label}, CorrelationId: {message.CorrelationId ?? message.CorrelationId}");
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task ReceiveMessagesAsync(string subscriptionName)
|
||||||
|
{
|
||||||
|
string subscriptionPath = EntityNameHelper.FormatSubscriptionPath(TopicName, subscriptionName);
|
||||||
|
IMessageReceiver subscriptionReceiver = new MessageReceiver(ServiceBusConnectionString, subscriptionPath, ReceiveMode.ReceiveAndDelete);
|
||||||
|
|
||||||
|
Console.WriteLine($"==========================================================================");
|
||||||
|
Console.WriteLine($"{DateTime.Now} :: Receiving Messages From Subscription: {subscriptionName}");
|
||||||
|
int receivedMessageCount = 0;
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
var receivedMessage = await subscriptionReceiver.ReceiveAsync(TimeSpan.Zero);
|
||||||
|
if (receivedMessage != null)
|
||||||
|
{
|
||||||
|
object colorProperty;
|
||||||
|
receivedMessage.UserProperties.TryGetValue("Color", out colorProperty);
|
||||||
|
Console.WriteLine($"Color Property = {colorProperty}, CorrelationId = {receivedMessage.CorrelationId ?? receivedMessage.CorrelationId}");
|
||||||
|
receivedMessageCount++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Console.WriteLine($"{DateTime.Now} :: Received '{receivedMessageCount}' Messages From Subscription: {subscriptionName}");
|
||||||
|
Console.WriteLine($"==========================================================================");
|
||||||
|
await subscriptionReceiver.CloseAsync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
|
||||||
|
<PropertyGroup>
|
||||||
|
<OutputType>Exe</OutputType>
|
||||||
|
<TargetFramework>netcoreapp2.0</TargetFramework>
|
||||||
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="2.0.0" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
</Project>
|
|
@ -0,0 +1,255 @@
|
||||||
|
# Get started configuring and managing rules for Subscriptions
|
||||||
|
|
||||||
|
In order to run the sample in this directory, replace the following bracketed values in the `Program.cs` file.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
// Connection String for the namespace can be obtained from the Azure portal under the
|
||||||
|
// `Shared Access policies` section.
|
||||||
|
const string ServiceBusConnectionString = "{Service Bus connection string}";
|
||||||
|
const string TopicName = "{Topic Name}";
|
||||||
|
|
||||||
|
// Simply create 4 default subscriptions (no rules specified explicitly) and provide subscription names.
|
||||||
|
// The Rule addition will be done as part of the sample depending on the subscription behavior expected.
|
||||||
|
const string allMessagesSubscriptionName = "{Subscription 1 Name}";
|
||||||
|
const string sqlFilterOnlySubscriptionName = "{Subscription 2 Name}";
|
||||||
|
const string sqlFilterWithActionSubscriptionName = "{Subscription 3 Name}";
|
||||||
|
const string correlationFilterSubscriptionName = "{Subscription 4 Name}";
|
||||||
|
```
|
||||||
|
|
||||||
|
Once you replace the above values run the following from a command prompt:
|
||||||
|
|
||||||
|
```
|
||||||
|
dotnet restore
|
||||||
|
dotnet build
|
||||||
|
dotnet run
|
||||||
|
```
|
||||||
|
|
||||||
|
## The Sample Program
|
||||||
|
To keep things reasonably simple, the sample program keeps send and receive code within a single hosting application.
|
||||||
|
Typically in real world applications these roles are often spread across applications, services, or at least across
|
||||||
|
independently deployed and run tiers of applications or services. For clarity, the send and receive activities are kept as
|
||||||
|
separate methods as if they were different apps.
|
||||||
|
|
||||||
|
For further information on how to create this sample on your own, follow the rest of the tutorial.
|
||||||
|
|
||||||
|
## What will be accomplished
|
||||||
|
Topics are similar to Queues for the send side of the application. However unlike Queues, Topic can have zero or more subscriptions,
|
||||||
|
from which messages can be retrieved and each of subscription act like independent queues. Whether a message is selected into the
|
||||||
|
subscription is determined by the Filter condition for the subscription. Filters can be one of the following:
|
||||||
|
|
||||||
|
1. `TrueFilter` - Selects all messages to subscription,
|
||||||
|
2. `FalseFilter` - Selects none of the messages to subscription,
|
||||||
|
3. `SqlFilter` - Holds a SQL-like condition expression that is evaluated in the ServiceBus service against the arriving messages'
|
||||||
|
user-defined properties and system properties and if matched the message is selected for subscription.
|
||||||
|
4. `CorrelationFilter` - Holds a set of conditions that is evaluated in the ServiceBus service against the arriving messages'
|
||||||
|
user-defined properties and system properties. A match exists when an arriving message's value for a property is equal to the
|
||||||
|
value specified in the correlation filter.
|
||||||
|
|
||||||
|
In this tutorial, we will write a console application to manage rules on Subscription (`AddRule`, `GetRules`, `RemoveRules`).
|
||||||
|
We will also explore different forms of subscription filters. Refer to the
|
||||||
|
link(https://github.com/Azure/azure-service-bus/tree/master/samples/DotNet/Microsoft.ServiceBus.Messaging/TopicFilters) for a more
|
||||||
|
detailed explanation of filters.
|
||||||
|
|
||||||
|
## Prerequisites
|
||||||
|
1. [.NET Core](https://www.microsoft.com/net/core)
|
||||||
|
2. An Azure subscription.
|
||||||
|
3. [A ServiceBus namespace](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal)
|
||||||
|
4. [A ServiceBus Topic](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions#2-create-a-topic-using-the-azure-portal)
|
||||||
|
5. [ServiceBus Subscriptions](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions)
|
||||||
|
|
||||||
|
### Create a console application
|
||||||
|
|
||||||
|
- Create a new .NET Core application. Check out [this link](https://docs.microsoft.com/en-us/dotnet/articles/core/getting-started) with help to create a new application on your operating system.
|
||||||
|
|
||||||
|
### Add the ServiceBus client reference
|
||||||
|
|
||||||
|
1. Add the following to your project.json, making sure that the solution references the `Microsoft.Azure.ServiceBus` project.
|
||||||
|
|
||||||
|
```json
|
||||||
|
"Microsoft.Azure.ServiceBus": "1.0.0"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Write some code to send messages to the topic, manage rules and receive messages from the subscription
|
||||||
|
1. Add the following using statement to the top of the Program.cs file.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
using Microsoft.Azure.ServiceBus;
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Add the following variables to the `Program` class, and replace the placeholder values:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
const string ServiceBusConnectionString = "{Service Bus connection string}";
|
||||||
|
const string TopicName = "{Topic Name}";
|
||||||
|
const string allMessagesSubscriptionName = "{Subscription 1 Name}";
|
||||||
|
const string sqlFilterOnlySubscriptionName = "{Subscription 2 Name}";
|
||||||
|
const string sqlFilterWithActionSubscriptionName = "{Subscription 3 Name}";
|
||||||
|
const string correlationFilterSubscriptionName = "{Subscription 4 Name}";
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create the following methods that will send messages with various combinations to the topic:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static async Task SendMessagesAsync()
|
||||||
|
{
|
||||||
|
Console.WriteLine($"==========================================================================");
|
||||||
|
Console.WriteLine("Sending Messages to Topic");
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await Task.WhenAll(
|
||||||
|
SendMessageAsync(label: "Red"),
|
||||||
|
SendMessageAsync(label: "Blue"),
|
||||||
|
SendMessageAsync(label: "Red", correlationId: "important"),
|
||||||
|
SendMessageAsync(label: "Blue", correlationId: "important"),
|
||||||
|
SendMessageAsync(label: "Red", correlationId: "notimportant"),
|
||||||
|
SendMessageAsync(label: "Blue", correlationId: "notimportant"),
|
||||||
|
SendMessageAsync(label: "Green"),
|
||||||
|
SendMessageAsync(label: "Green", correlationId: "important"),
|
||||||
|
SendMessageAsync(label: "Green", correlationId: "notimportant")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task SendMessageAsync(string label, string correlationId = null)
|
||||||
|
{
|
||||||
|
Message message = new Message { Label = label };
|
||||||
|
message.UserProperties.Add("Color", label);
|
||||||
|
|
||||||
|
if (correlationId != null)
|
||||||
|
{
|
||||||
|
message.CorrelationId = correlationId;
|
||||||
|
}
|
||||||
|
|
||||||
|
await topicClient.SendAsync(message);
|
||||||
|
Console.WriteLine($"Sent Message:: Label: {message.Label}, CorrelationId: {message.CorrelationId ?? message.CorrelationId}");
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new method Task `ReceiveMessagesAsync` with the following code to process messages from a given subscription:
|
||||||
|
```csharp
|
||||||
|
static async Task ReceiveMessagesAsync(string subscriptionName)
|
||||||
|
{
|
||||||
|
string subscriptionPath = EntityNameHelper.FormatSubscriptionPath(TopicName, subscriptionName);
|
||||||
|
IMessageReceiver subscriptionReceiver = new MessageReceiver(ServiceBusConnectionString, subscriptionPath, ReceiveMode.ReceiveAndDelete);
|
||||||
|
|
||||||
|
Console.WriteLine($"==========================================================================");
|
||||||
|
Console.WriteLine($"{DateTime.Now} :: Receiving Messages From Subscription: {subscriptionName}");
|
||||||
|
int receivedMessageCount = 0;
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
var receivedMessage = await subscriptionReceiver.ReceiveAsync(TimeSpan.Zero);
|
||||||
|
if (receivedMessage != null)
|
||||||
|
{
|
||||||
|
object colorProperty;
|
||||||
|
receivedMessage.UserProperties.TryGetValue("Color", out colorProperty);
|
||||||
|
Console.WriteLine($"Color Property = {colorProperty}, CorrelationId = {receivedMessage.CorrelationId ?? receivedMessage.CorrelationId}");
|
||||||
|
receivedMessageCount++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Console.WriteLine($"{DateTime.Now} :: Received '{receivedMessageCount}' Messages From Subscription: {subscriptionName}");
|
||||||
|
Console.WriteLine($"==========================================================================");
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Create a new method called `MainAsync` with the following code:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
static async Task MainAsync()
|
||||||
|
{
|
||||||
|
topicClient = new TopicClient(ServiceBusConnectionString, TopicName);
|
||||||
|
allMessagessubscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, allMessagesSubscriptionName);
|
||||||
|
sqlFilterOnlySubscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, sqlFilterOnlySubscriptionName);
|
||||||
|
sqlFilterWithActionSubscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, sqlFilterWithActionSubscriptionName);
|
||||||
|
correlationFilterSubscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, correlationFilterSubscriptionName);
|
||||||
|
|
||||||
|
// First Subscription is already created with default rule. Leave as is.
|
||||||
|
|
||||||
|
// 2nd Subscription: Add SqlFilter on Subscription 2
|
||||||
|
// Delete Default Rule.
|
||||||
|
// Add the required SqlFilter Rule
|
||||||
|
// Note: Does not apply to this sample but if there are multiple rules configured for a
|
||||||
|
// single subscription, then one message is delivered to the subscription when any of the
|
||||||
|
// rule matches. If more than one rules match and if there is no `SqlRuleAction` set for the
|
||||||
|
// rule, then only one message will be delivered to the subscription. If more than one rules
|
||||||
|
// match and there is a `SqlRuleAction` specified for the rule, then one message per `SqlRuleAction`
|
||||||
|
// is delivered to the subscription.
|
||||||
|
Console.WriteLine($"SubscriptionName: {sqlFilterOnlySubscriptionName}, Removing Default Rule and Adding SqlFilter");
|
||||||
|
await sqlFilterOnlySubscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName);
|
||||||
|
await sqlFilterOnlySubscriptionClient.AddRuleAsync(new RuleDescription
|
||||||
|
{
|
||||||
|
Filter = new SqlFilter("Color = 'Red'"),
|
||||||
|
Name = "RedSqlRule"
|
||||||
|
});
|
||||||
|
|
||||||
|
// 3rd Subscription: Add SqlFilter and SqlRuleAction on Subscription 3
|
||||||
|
// Delete Default Rule
|
||||||
|
// Add the required SqlFilter Rule and Action
|
||||||
|
Console.WriteLine($"SubscriptionName: {sqlFilterWithActionSubscriptionName}, Removing Default Rule and Adding SqlFilter and SqlRuleAction");
|
||||||
|
await sqlFilterWithActionSubscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName);
|
||||||
|
await sqlFilterWithActionSubscriptionClient.AddRuleAsync(new RuleDescription
|
||||||
|
{
|
||||||
|
Filter = new SqlFilter("Color = 'Blue'"),
|
||||||
|
Action = new SqlRuleAction("SET Color = 'BlueProcessed'"),
|
||||||
|
Name = "BlueSqlRule"
|
||||||
|
});
|
||||||
|
|
||||||
|
// 4th Subscription: Add Correlation Filter on Subscription 4
|
||||||
|
Console.WriteLine($"SubscriptionName: {sqlFilterWithActionSubscriptionName}, Removing Default Rule and Adding CorrelationFilter");
|
||||||
|
await correlationFilterSubscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName);
|
||||||
|
await correlationFilterSubscriptionClient.AddRuleAsync(new RuleDescription
|
||||||
|
{
|
||||||
|
Filter = new CorrelationFilter() { Label = "Red", CorrelationId = "important" },
|
||||||
|
Name = "ImportantCorrelationRule"
|
||||||
|
});
|
||||||
|
|
||||||
|
// Get Rules on Subscription, called here only for one subscription as example
|
||||||
|
var rules = (await correlationFilterSubscriptionClient.GetRulesAsync()).ToList();
|
||||||
|
Console.WriteLine($"GetRules:: SubscriptionName: {correlationFilterSubscriptionName}, CorrelationFilter Name: {rules[0].Name}, Rule: {rules[0].Filter}");
|
||||||
|
|
||||||
|
// Send messages to Topic
|
||||||
|
await SendMessagesAsync();
|
||||||
|
|
||||||
|
// Receive messages from 'allMessagesSubscriptionName'. Should receive all 9 messages
|
||||||
|
await ReceiveMessagesAsync(allMessagesSubscriptionName);
|
||||||
|
|
||||||
|
// Receive messages from 'sqlFilterOnlySubscriptionName'. Should receive all messages with Color = 'Red' i.e 3 messages
|
||||||
|
await ReceiveMessagesAsync(sqlFilterOnlySubscriptionName);
|
||||||
|
|
||||||
|
// Receive messages from 'sqlFilterWithActionSubscriptionClient'. Should receive all messages with Color = 'Blue'
|
||||||
|
// i.e 3 messages AND all messages should have color set to 'BlueProcessed'
|
||||||
|
await ReceiveMessagesAsync(sqlFilterWithActionSubscriptionName);
|
||||||
|
|
||||||
|
// Receive messages from 'correlationFilterSubscriptionName'. Should receive all messages with Color = 'Red' and CorrelationId = "important"
|
||||||
|
// i.e 1 message
|
||||||
|
await ReceiveMessagesAsync(correlationFilterSubscriptionName);
|
||||||
|
|
||||||
|
Console.WriteLine("=========================================================");
|
||||||
|
Console.WriteLine("Completed Receiving all messages... Press any key to exit");
|
||||||
|
Console.WriteLine("=========================================================");
|
||||||
|
|
||||||
|
Console.ReadKey();
|
||||||
|
|
||||||
|
await allMessagessubscriptionClient.CloseAsync();
|
||||||
|
await sqlFilterOnlySubscriptionClient.CloseAsync();
|
||||||
|
await sqlFilterWithActionSubscriptionClient.CloseAsync();
|
||||||
|
await correlationFilterSubscriptionClient.CloseAsync();
|
||||||
|
await topicClient.CloseAsync();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
1. Add the following code to the `Main` method:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
MainAsync(args).GetAwaiter().GetResult();
|
||||||
|
```
|
||||||
|
|
||||||
|
Congratulations! You have now learnt to configure and manage rules for a ServiceBus Topic Subscription.
|
Загрузка…
Ссылка в новой задаче