Merge pull request #469 from spelluru/newfiltersample2
Azure.Messaging.ServiceBus sample for subscription filters
This commit is contained in:
Коммит
6eaf44ddaa
|
@ -0,0 +1,15 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>net6.0</TargetFramework>
|
||||
</PropertyGroup>
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.8.0-beta.2" />
|
||||
<PackageReference Include="Microsoft.CSharp" Version="4.7.0" />
|
||||
<PackageReference Include="Newtonsoft.Json" Version="10.0.3" />
|
||||
<PackageReference Include="System.Diagnostics.Debug" Version="4.3.0" />
|
||||
<PackageReference Include="System.Linq" Version="4.3.0" />
|
||||
<PackageReference Include="System.Net.WebSockets.Client" Version="4.3.1" />
|
||||
</ItemGroup>
|
||||
</Project>
|
|
@ -0,0 +1,97 @@
|
|||
//
|
||||
// Copyright © Microsoft Corporation, All Rights Reserved
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS
|
||||
// OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION
|
||||
// ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A
|
||||
// PARTICULAR PURPOSE, MERCHANTABILITY OR NON-INFRINGEMENT.
|
||||
//
|
||||
// See the Apache License, Version 2.0 for the specific language
|
||||
// governing permissions and limitations under the License.
|
||||
|
||||
namespace CreateTopicsAndSubscriptionsWithFilters
|
||||
{
|
||||
using Azure.Messaging.ServiceBus.Administration;
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
public class Program
|
||||
{
|
||||
// Service Bus Administration Client object to create topics and subscriptions
|
||||
static ServiceBusAdministrationClient adminClient;
|
||||
|
||||
// connection string to the Service Bus namespace
|
||||
static readonly string connectionString = "<SERVICE BUS NAMESPACE - CONNECTION STRING>";
|
||||
|
||||
// name of the Service Bus topic
|
||||
static readonly string topicName = "topicfiltersampletopic";
|
||||
|
||||
// names of subscriptions to the topic
|
||||
static readonly string subscriptionAllOrders = "AllOrders";
|
||||
static readonly string subscriptionColorRed = "ColorRed";
|
||||
static readonly string subscriptionColorBlueSize10Orders = "ColorBlueSize10Orders";
|
||||
static readonly string subscriptionHighPriorityRedOrders = "HighPriorityRedOrders";
|
||||
|
||||
public static async Task Main()
|
||||
{
|
||||
try
|
||||
{
|
||||
|
||||
Console.WriteLine("Creating the Service Bus Administration Client object");
|
||||
adminClient = new ServiceBusAdministrationClient(connectionString);
|
||||
|
||||
Console.WriteLine($"Creating the topic {topicName}");
|
||||
await adminClient.CreateTopicAsync(topicName);
|
||||
|
||||
Console.WriteLine($"Creating the subscription {subscriptionAllOrders} for the topic with a SQL filter ");
|
||||
|
||||
// Create a True Rule filter with an expression that always evaluates to true
|
||||
// It's equivalent to using SQL rule filter with 1=1 as the expression
|
||||
|
||||
await adminClient.CreateSubscriptionAsync(
|
||||
new CreateSubscriptionOptions(topicName, subscriptionAllOrders),
|
||||
new CreateRuleOptions("AllOrders", new TrueRuleFilter()));
|
||||
|
||||
Console.WriteLine($"Creating the subscription {subscriptionColorBlueSize10Orders} with a SQL filter");
|
||||
// Create a SQL filter with color set to blue and quantity to 10
|
||||
await adminClient.CreateSubscriptionAsync(
|
||||
new CreateSubscriptionOptions(topicName, subscriptionColorBlueSize10Orders),
|
||||
new CreateRuleOptions("BlueSize10Orders", new SqlRuleFilter("color='blue' AND quantity=10")));
|
||||
|
||||
Console.WriteLine($"Creating the subscription {subscriptionColorRed} with a SQL filter");
|
||||
// Create a SQL filter with color equals to red and a SQL action with a set of statements
|
||||
await adminClient.CreateSubscriptionAsync(topicName, subscriptionColorRed);
|
||||
// remove the $Default rule
|
||||
await adminClient.DeleteRuleAsync(topicName, subscriptionColorRed, "$Default");
|
||||
// now create the new rule. notice that user. prefix is used for the user/application property
|
||||
await adminClient.CreateRuleAsync(topicName, subscriptionColorRed, new CreateRuleOptions
|
||||
{
|
||||
Name = "RedOrdersWithAction",
|
||||
Filter = new SqlRuleFilter("user.color='red'"),
|
||||
Action = new SqlRuleAction("SET quantity = quantity / 2; REMOVE priority;SET sys.CorrelationId = 'low';")
|
||||
|
||||
}
|
||||
);
|
||||
|
||||
Console.WriteLine($"Creating the subscription {subscriptionHighPriorityRedOrders} with a correlation filter");
|
||||
// Create a correlation filter with color set to Red and priority set to High
|
||||
await adminClient.CreateSubscriptionAsync(
|
||||
new CreateSubscriptionOptions(topicName, subscriptionHighPriorityRedOrders),
|
||||
new CreateRuleOptions("HighPriorityRedOrders", new CorrelationRuleFilter() {Subject = "red", CorrelationId = "high"} ));
|
||||
|
||||
// delete resources
|
||||
//await adminClient.DeleteTopicAsync(topicName);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Console.WriteLine(e.ToString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
# Topic Subscription Filters
|
||||
This sample illustrates creating filtered subscriptions for topics.
|
||||
|
||||
## CreateTopicAndSubscription
|
||||
|
||||
1. In the **Program.cs** file, replace `<SERVICE BUS NAMESPACE - CONNECTION STRING>` with the connection string to your Service Bus namespace.
|
||||
1. Build the project.
|
||||
1. Run the app, which creates the following Service Bus entities:
|
||||
|
||||
1. A topic named `TopicFilterSampleTopic`.
|
||||
2. Subscriptions to the above topic with the following settings.
|
||||
|
||||
| Subscription name | Filter type | Filter expression | Action |
|
||||
| --------- | ----------- | ------------------ | ------ |
|
||||
| AllOrders | True Rule filter | `1=1` | |
|
||||
| ColorBlueSize10Orders | SQL filter | `color = 'blue' AND quantity = 10` | |
|
||||
| ColorRed | SQL filter | `color = 'red'` | `SET quantity = quantity / 2;REMOVE priority; SET sys.CorrelationId = 'low'`|
|
||||
| ColorRed | Correlation filter | `"label": "red", "correlationId": "high"` | |
|
||||
|
||||
## SendAndReceiveMessages
|
||||
|
||||
1. In the **Program.cs** file, replace `<SERVICE BUS NAMESPACE - CONNECTION STRING>` with the connection string to your Service Bus namespace.
|
||||
1. Build the project.
|
||||
1. Run the app and see the output.
|
||||
|
||||
```bash
|
||||
Sending orders to topic.
|
||||
Sent order with Color=yellow, Quantity=5, Priority=low
|
||||
Sent order with Color=blue, Quantity=10, Priority=low
|
||||
Sent order with Color=blue, Quantity=5, Priority=high
|
||||
Sent order with Color=blue, Quantity=5, Priority=low
|
||||
Sent order with Color=red, Quantity=5, Priority=low
|
||||
Sent order with Color=yellow, Quantity=5, Priority=low
|
||||
Sent order with Color=yellow, Quantity=10, Priority=high
|
||||
Sent order with Color=, Quantity=0, Priority=
|
||||
Sent order with Color=blue, Quantity=10, Priority=low
|
||||
Sent order with Color=red, Quantity=10, Priority=low
|
||||
Sent order with Color=red, Quantity=10, Priority=high
|
||||
Sent order with Color=yellow, Quantity=10, Priority=low
|
||||
Sent order with Color=red, Quantity=5, Priority=low
|
||||
All messages sent.
|
||||
|
||||
Receiving messages from subscription AllOrders.
|
||||
color=blue,quantity=5,priority=low,CorrelationId=low
|
||||
color=red,quantity=10,priority=high,CorrelationId=high
|
||||
color=yellow,quantity=5,priority=low,CorrelationId=low
|
||||
color=blue,quantity=10,priority=low,CorrelationId=low
|
||||
color=blue,quantity=5,priority=high,CorrelationId=high
|
||||
color=blue,quantity=10,priority=low,CorrelationId=low
|
||||
color=red,quantity=5,priority=low,CorrelationId=low
|
||||
color=red,quantity=10,priority=low,CorrelationId=low
|
||||
color=red,quantity=5,priority=low,CorrelationId=low
|
||||
color=yellow,quantity=10,priority=high,CorrelationId=high
|
||||
color=yellow,quantity=5,priority=low,CorrelationId=low
|
||||
color=yellow,quantity=10,priority=low,CorrelationId=low
|
||||
color=,quantity=0,priority=,CorrelationId=
|
||||
Received 13 messages from subscription AllOrders.
|
||||
|
||||
Receiving messages from subscription ColorBlueSize10Orders.
|
||||
color=blue,quantity=10,priority=low,CorrelationId=low
|
||||
color=blue,quantity=10,priority=low,CorrelationId=low
|
||||
Received 2 messages from subscription ColorBlueSize10Orders.
|
||||
|
||||
Receiving messages from subscription ColorRed.
|
||||
color=red,quantity=5,priority=high,RuleName=RedOrdersWithAction,CorrelationId=high
|
||||
color=red,quantity=2,priority=low,RuleName=RedOrdersWithAction,CorrelationId=low
|
||||
color=red,quantity=5,priority=low,RuleName=RedOrdersWithAction,CorrelationId=low
|
||||
color=red,quantity=2,priority=low,RuleName=RedOrdersWithAction,CorrelationId=low
|
||||
Received 4 messages from subscription ColorRed.
|
||||
|
||||
Receiving messages from subscription HighPriorityRedOrders.
|
||||
color=red,quantity=10,priority=high,CorrelationId=high
|
||||
Received 1 messages from subscription HighPriorityRedOrders.
|
||||
```
|
||||
|
||||
## Next steps
|
||||
[Read more about topic filters in the documentation.](https://docs.microsoft.com/azure/service-bus-messaging/topic-filters)
|
||||
|
|
@ -0,0 +1,27 @@
|
|||
//----------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
//----------------------------------------------------------------
|
||||
|
||||
namespace SendAndReceiveMessages
|
||||
{
|
||||
class Order
|
||||
{
|
||||
public string Color
|
||||
{
|
||||
get;
|
||||
set;
|
||||
}
|
||||
|
||||
public int Quantity
|
||||
{
|
||||
get;
|
||||
set;
|
||||
}
|
||||
|
||||
public string Priority
|
||||
{
|
||||
get;
|
||||
set;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,152 @@
|
|||
//
|
||||
// Copyright © Microsoft Corporation, All Rights Reserved
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS
|
||||
// OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION
|
||||
// ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A
|
||||
// PARTICULAR PURPOSE, MERCHANTABILITY OR NON-INFRINGEMENT.
|
||||
//
|
||||
// See the Apache License, Version 2.0 for the specific language
|
||||
// governing permissions and limitations under the License.
|
||||
|
||||
namespace SendAndReceiveMessages
|
||||
{
|
||||
using System;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using Azure.Messaging.ServiceBus;
|
||||
using Newtonsoft.Json;
|
||||
|
||||
public class Program
|
||||
{
|
||||
const string TopicName = "TopicFilterSampleTopic";
|
||||
const string SubscriptionAllMessages = "AllOrders";
|
||||
const string SubscriptionColorBlueSize10Orders = "ColorBlueSize10Orders";
|
||||
const string SubscriptionColorRed = "ColorRed";
|
||||
const string SubscriptionHighPriorityOrders = "HighPriorityRedOrders";
|
||||
|
||||
// connection string to your Service Bus namespace
|
||||
static string connectionString = "<SERVICE BUS NAMESPACE - CONNECTION STRING>";
|
||||
|
||||
// the client that owns the connection and can be used to create senders and receivers
|
||||
static ServiceBusClient client;
|
||||
|
||||
// the sender used to publish messages to the topic
|
||||
static ServiceBusSender sender;
|
||||
|
||||
// the receiver used to receive messages from the subscription
|
||||
static ServiceBusReceiver receiver;
|
||||
|
||||
public async Task SendAndReceiveTestsAsync(string connectionString)
|
||||
{
|
||||
// This sample demonstrates how to use advanced filters with ServiceBus topics and subscriptions.
|
||||
// The sample creates a topic and 3 subscriptions with different filter definitions.
|
||||
// Each receiver will receive matching messages depending on the filter associated with a subscription.
|
||||
|
||||
// Send sample messages.
|
||||
await this.SendMessagesToTopicAsync(connectionString);
|
||||
|
||||
// Receive messages from subscriptions.
|
||||
await this.ReceiveAllMessageFromSubscription(connectionString, SubscriptionAllMessages);
|
||||
await this.ReceiveAllMessageFromSubscription(connectionString, SubscriptionColorBlueSize10Orders);
|
||||
await this.ReceiveAllMessageFromSubscription(connectionString, SubscriptionColorRed);
|
||||
await this.ReceiveAllMessageFromSubscription(connectionString, SubscriptionHighPriorityOrders);
|
||||
}
|
||||
|
||||
|
||||
async Task SendMessagesToTopicAsync(string connectionString)
|
||||
{
|
||||
// Create the clients that we'll use for sending and processing messages.
|
||||
client = new ServiceBusClient(connectionString);
|
||||
sender = client.CreateSender(TopicName);
|
||||
|
||||
Console.WriteLine("\nSending orders to topic.");
|
||||
|
||||
// Now we can start sending orders.
|
||||
await Task.WhenAll(
|
||||
SendOrder(sender, new Order()),
|
||||
SendOrder(sender, new Order { Color = "blue", Quantity = 5, Priority = "low" }),
|
||||
SendOrder(sender, new Order { Color = "red", Quantity = 10, Priority = "high" }),
|
||||
SendOrder(sender, new Order { Color = "yellow", Quantity = 5, Priority = "low" }),
|
||||
SendOrder(sender, new Order { Color = "blue", Quantity = 10, Priority = "low" }),
|
||||
SendOrder(sender, new Order { Color = "blue", Quantity = 5, Priority = "high" }),
|
||||
SendOrder(sender, new Order { Color = "blue", Quantity = 10, Priority = "low" }),
|
||||
SendOrder(sender, new Order { Color = "red", Quantity = 5, Priority = "low" }),
|
||||
SendOrder(sender, new Order { Color = "red", Quantity = 10, Priority = "low" }),
|
||||
SendOrder(sender, new Order { Color = "red", Quantity = 5, Priority = "low" }),
|
||||
SendOrder(sender, new Order { Color = "yellow", Quantity = 10, Priority = "high" }),
|
||||
SendOrder(sender, new Order { Color = "yellow", Quantity = 5, Priority = "low" }),
|
||||
SendOrder(sender, new Order { Color = "yellow", Quantity = 10, Priority = "low" })
|
||||
);
|
||||
|
||||
Console.WriteLine("All messages sent.");
|
||||
}
|
||||
|
||||
async Task SendOrder(ServiceBusSender sender, Order order)
|
||||
{
|
||||
var message = new ServiceBusMessage(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(order)))
|
||||
{
|
||||
CorrelationId = order.Priority,
|
||||
Subject = order.Color,
|
||||
ApplicationProperties =
|
||||
{
|
||||
{ "color", order.Color },
|
||||
{ "quantity", order.Quantity },
|
||||
{ "priority", order.Priority }
|
||||
}
|
||||
};
|
||||
await sender.SendMessageAsync(message);
|
||||
|
||||
Console.WriteLine("Sent order with Color={0}, Quantity={1}, Priority={2}", order.Color, order.Quantity, order.Priority);
|
||||
}
|
||||
|
||||
async Task ReceiveAllMessageFromSubscription(string connectionString, string subsName)
|
||||
{
|
||||
var receivedMessages = 0;
|
||||
|
||||
receiver = client.CreateReceiver(TopicName, subsName, new ServiceBusReceiverOptions() { ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete } );
|
||||
|
||||
// Create a receiver from the subscription client and receive all messages.
|
||||
Console.WriteLine("\nReceiving messages from subscription {0}.", subsName);
|
||||
|
||||
while (true)
|
||||
{
|
||||
var receivedMessage = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(10));
|
||||
if (receivedMessage != null)
|
||||
{
|
||||
foreach (var prop in receivedMessage.ApplicationProperties)
|
||||
{
|
||||
Console.Write("{0}={1},", prop.Key, prop.Value);
|
||||
}
|
||||
Console.WriteLine("CorrelationId={0}", receivedMessage.CorrelationId);
|
||||
receivedMessages++;
|
||||
}
|
||||
else
|
||||
{
|
||||
// No more messages to receive.
|
||||
break;
|
||||
}
|
||||
}
|
||||
Console.WriteLine("Received {0} messages from subscription {1}.", receivedMessages, subsName);
|
||||
}
|
||||
|
||||
public static async Task Main()
|
||||
{
|
||||
try
|
||||
{
|
||||
Program app = new Program();
|
||||
await app.SendAndReceiveTestsAsync(connectionString);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Console.WriteLine(e.ToString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFrameworks>net6.0</TargetFrameworks>
|
||||
</PropertyGroup>
|
||||
<ItemGroup>
|
||||
<None Remove="readme.md" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.7.0" />
|
||||
<PackageReference Include="Microsoft.CSharp" Version="4.7.0" />
|
||||
<PackageReference Include="Newtonsoft.Json" Version="10.0.3" />
|
||||
<PackageReference Include="System.Diagnostics.Debug" Version="4.3.0" />
|
||||
<PackageReference Include="System.Linq" Version="4.3.0" />
|
||||
<PackageReference Include="System.Net.WebSockets.Client" Version="4.3.1" />
|
||||
</ItemGroup>
|
||||
</Project>
|
|
@ -0,0 +1,31 @@
|
|||
|
||||
Microsoft Visual Studio Solution File, Format Version 12.00
|
||||
# Visual Studio Version 16
|
||||
VisualStudioVersion = 16.0.32228.343
|
||||
MinimumVisualStudioVersion = 10.0.40219.1
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CreateTopicsAndSubscriptions", "CreateTopicAndSubscriptions\CreateTopicsAndSubscriptions.csproj", "{46F31904-B0ED-411C-93E2-DA2CCBA8FF88}"
|
||||
EndProject
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SendAndReceiveMessages", "SendAndReceiveMessages\SendAndReceiveMessages.csproj", "{8AB3BE77-4879-4034-9BE3-4CEFAAA31D1E}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
Debug|Any CPU = Debug|Any CPU
|
||||
Release|Any CPU = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(ProjectConfigurationPlatforms) = postSolution
|
||||
{46F31904-B0ED-411C-93E2-DA2CCBA8FF88}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{46F31904-B0ED-411C-93E2-DA2CCBA8FF88}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{46F31904-B0ED-411C-93E2-DA2CCBA8FF88}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{46F31904-B0ED-411C-93E2-DA2CCBA8FF88}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{8AB3BE77-4879-4034-9BE3-4CEFAAA31D1E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{8AB3BE77-4879-4034-9BE3-4CEFAAA31D1E}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{8AB3BE77-4879-4034-9BE3-4CEFAAA31D1E}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{8AB3BE77-4879-4034-9BE3-4CEFAAA31D1E}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(SolutionProperties) = preSolution
|
||||
HideSolutionNode = FALSE
|
||||
EndGlobalSection
|
||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||
SolutionGuid = {496C4E54-35CB-47AC-9D0E-9A080C1F3A31}
|
||||
EndGlobalSection
|
||||
EndGlobal
|
Загрузка…
Ссылка в новой задаче