This commit is contained in:
Clemens Vasters 2017-12-13 06:57:39 +00:00
Родитель 782c30da92 f7aa63e5ff
Коммит 8f7e869cb5
163 изменённых файлов: 5226 добавлений и 906 удалений

6
.github/CONTRIBUTING.md поставляемый
Просмотреть файл

@ -26,6 +26,10 @@ If you encounter any service side bugs, please file an issue [here](https://gith
To suggest a new feature or changes that could be made, file an issue the same way you would for a bug, but remove the provided template and replace it with information about your suggestion.
### Reply times
The Service Bus team tries to at least reply to issues as fast as possible but please expect 5-7 business days.
### Pull Requests
You can find all of the pull requests that have been opened in the [Pull Request](https://github.com/Azure/azure-service-bus/pulls) section of the repository.
@ -41,4 +45,4 @@ The following guidelines must be followed in **EVERY** pull request that is open
- Title of the pull request is clear and informative
- There are a small number of commits that each have an informative message
- A description of the changes the pull request makes is included, and a reference to the bug/issue the pull request fixes is included, if applicable
- A description of the changes the pull request makes is included, and a reference to the bug/issue the pull request fixes is included, if applicable

Просмотреть файл

@ -17,4 +17,8 @@ If you are looking for a specific client library, see the following:
## How to provide feedback
See our [Contribution Guidelines](./.github/CONTRIBUTING.md).
See our [Contribution Guidelines](./.github/CONTRIBUTING.md).
## Issue response times
We are trying to do our best, to at least reply to issues as fast as possible, yet in most cases please expect 5-7 business days.

Просмотреть файл

@ -2,11 +2,11 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp1.1</TargetFramework>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="1.0.0" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="2.0.0" />
</ItemGroup>
</Project>

Просмотреть файл

@ -2,11 +2,11 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp1.1</TargetFramework>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="1.0.0" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="2.0.0" />
</ItemGroup>
</Project>

Просмотреть файл

@ -2,11 +2,11 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp1.1</TargetFramework>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="1.0.0" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="2.0.0" />
</ItemGroup>
</Project>

Просмотреть файл

@ -2,11 +2,11 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp1.1</TargetFramework>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="1.0.0" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="2.0.0" />
</ItemGroup>
</Project>

Просмотреть файл

@ -2,11 +2,11 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp1.1</TargetFramework>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="1.0.0" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="2.0.0" />
</ItemGroup>
</Project>

Просмотреть файл

@ -2,11 +2,11 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp1.1</TargetFramework>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="1.0.0" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="2.0.0" />
</ItemGroup>
</Project>

Просмотреть файл

@ -124,7 +124,7 @@ model is strongly encouraged at all times as it yields significantly more effici
* **Durable Senders** - The [DurableSender](./DurableSender) sample shows how to make client applications robust against frequent
network link failures.
* **Geo Replication** - The [GeoReplication](./GeoReplication) sample illustrates how to route messages through two distinct
entities, possibly located in different namespaces in differentr datacenters, to limit the application's availability risk.
entities, possibly located in different namespaces in different datacenters, to limit the application's availability risk.
### Session and Workflow Management Features
@ -143,4 +143,4 @@ model is strongly encouraged at all times as it yields significantly more effici
* **Sessions with the NetMessagingBinding** - The [NetMessagingSession](./NetMessagingSession) sample shows how to use Service Bus
sessions with the NetMessagingBinding.

Просмотреть файл

19
samples/Java/.vscode/tasks.json поставляемый Normal file
Просмотреть файл

@ -0,0 +1,19 @@
{
// See https://go.microsoft.com/fwlink/?LinkId=733558
// for the documentation about the tasks.json format
"version": "2.0.0",
"tasks": [
{
"taskName": "verify",
"type": "shell",
"command": "mvn -B verify",
"group": "build"
},
{
"taskName": "test",
"type": "shell",
"command": "mvn -B test",
"group": "test"
}
]
}

Просмотреть файл

@ -0,0 +1,88 @@
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>autoforward</artifactId>
<name>autoforward</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<debug>true</debug>
<debuglevel>lines,vars,source</debuglevel>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.microsoft.azure.servicebus.samples.queuesgettingstarted.QueuesGettingStarted</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>[1.2.17,]</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>[1.7.25,]</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>[1.4,]</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>[2.8.2,]</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>[23.0,]</version>
</dependency>
</dependencies>
<groupId>com.microsoft.azure</groupId>
<version>1.0.0</version>
</project>

Просмотреть файл

@ -0,0 +1,76 @@
# Auto-Forward
This sample demonstrates how to automatically forward messages from a queue,
subscription, or deadletter queue into another queue or topic.
Refer to the main [README](../README.md) document for setup instructions.
## What is Auto Forwarding?
The Auto-Forwarding feature enables you to chain the a Topic Subscription or a
Queue to destination Queue or Topic that is part of the same Service Bus
namespace. When the feature is enabled, Service Bus automatically moves any
messages arriving in the source Queue or Subscription into the destination Queue
or Topic.
Auto-Forwarding allows for a range of powerful routing patterns inside Service
Bus, including decoupling of send and receive locations, fan-in, fan-out, and
application-defined partitioning.
[Read more about auto-forwarding in the documentation.][1]
## Sample code
The sample generates 2 messages: M1, and M2. M1 is sent to a source topic
with one subscription, from which it is forwarded to a destination queue. M2 is
sent to the destination queue directly.
The setup template creates the topology for this example as shown here. Note
that the topic whose subscription auto-forwards into the target queue is made
dependent on the target queue, so that the queue is created first. The
connection between the two entitries is made with the ```forwardTo```property of
the subscription pointing to the target queue.
``` JSON
{
"apiVersion": "[variables('apiVersion')]",
"name": "AutoForwardSourceTopic",
"type": "topics",
"dependsOn": [
"[concat('Microsoft.ServiceBus/namespaces/',
parameters('serviceBusNamespaceName'))]",
"AutoForwardTargetQueue",
],
"properties": {},
"resources": [
{
"apiVersion": "[variables('apiVersion')]",
"name": "Forwarder",
"type": "subscriptions",
"dependsOn": [ "AutoForwardSourceTopic" ],
"properties": {
"forwardTo": "AutoForwardTargetQueue"
},
"resources": []
}
]
},
{
"apiVersion": "[variables('apiVersion')]",
"name": "AutoForwardTargetQueue",
"type": "queues",
"dependsOn": [
"[concat('Microsoft.ServiceBus/namespaces/',
parameters('serviceBusNamespaceName'))]",
],
"properties": {},
"resources": []
}
]
}
```
The sample is documented inline in the [AutoForward.java](.\src\main\java\com\microsoft\azure\servicebus\samples\autoforward\AutoForward.java) file.
[1]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-auto-forwarding

Просмотреть файл

@ -0,0 +1,124 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.microsoft.azure.servicebus.samples.autoforward;
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import static java.nio.charset.StandardCharsets.*;
import java.time.Duration;
import java.util.*;
import java.util.function.Function;
import org.apache.commons.cli.*;
public class AutoForward {
public void run(String connectionString) throws Exception
{
IMessageSender topicSender;
IMessageSender queueSender;
IMessageReceiver targetQueueReceiver;
System.out.printf("\nSending messages\n");
topicSender = ClientFactory.createMessageSenderFromConnectionStringBuilder(
new ConnectionStringBuilder(connectionString, "AutoForwardSourceTopic"));
topicSender.send(createMessage("M1"));
queueSender = ClientFactory.createMessageSenderFromConnectionStringBuilder(
new ConnectionStringBuilder(connectionString, "AutoForwardTargetQueue"));
queueSender.send(createMessage("M2"));
System.out.printf("\nReceiving messages\n");
targetQueueReceiver = ClientFactory.createMessageReceiverFromConnectionStringBuilder(
new ConnectionStringBuilder(connectionString, "AutoForwardTargetQueue"), ReceiveMode.PEEKLOCK);
for (int i = 0; i < 2; i++)
{
IMessage message = targetQueueReceiver.receive(Duration.ofSeconds(10));
if (message != null)
{
this.printReceivedMessage(message);
targetQueueReceiver.complete(message.getLockToken());
}
else
{
throw new Exception("Expected message not receive\n");
}
}
targetQueueReceiver.close();
}
void printReceivedMessage(IMessage receivedMessage) {
System.out.printf("Received message:\n" + "\tLabel:\t%s\n" + "\tBody:\t%s\n",
receivedMessage.getLabel(), new String(receivedMessage.getBody(), UTF_8));
if (receivedMessage.getProperties() != null)
for (String p : receivedMessage.getProperties().keySet()) {
System.out.printf("\tProperty:\t%s = %s\n", p, receivedMessage.getProperties().get(p));
}
}
// Create a new Service Bus message.
IMessage createMessage(String label)
{
// Create a Service Bus message.
IMessage msg = new Message(("This is the body of message \"" + label + "\".").getBytes(UTF_8));
msg.setProperties(new HashMap<String, String>(){{
put("Priority", "1");
put("Importance", "High");
}});
msg.setLabel(label);
msg.setTimeToLive(Duration.ofSeconds(90));
return msg;
}
public static void main(String[] args) {
System.exit(runApp(args, (connectionString) -> {
AutoForward app = new AutoForward();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";
public static int runApp(String[] args, Function<String, Integer> run) {
try {
String connectionString = null;
// parse connection string from command line
Options options = new Options();
options.addOption(new Option("c", true, "Connection string"));
CommandLineParser clp = new DefaultParser();
CommandLine cl = clp.parse(options, args);
if (cl.getOptionValue("c") != null) {
connectionString = cl.getOptionValue("c");
}
// get overrides from the environment
String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
if (env != null) {
connectionString = env;
}
if (connectionString == null) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("run jar with", "", options, "", true);
return 2;
}
return run.apply(connectionString);
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 3;
}
}
}

Просмотреть файл

@ -1,8 +1,6 @@
log4j.rootLogger=INFO, stdout
log4j.logger.org.apache.qpid.jms=ALL
log4j.rootLogger=ERROR, stdout
log4j.logger.org.apache.qpid.jms=ERROR
# CONSOLE appender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender

Просмотреть файл

@ -0,0 +1,22 @@
package com.microsoft.azure.servicebus.samples.autoforward;
import org.junit.Assert;
import org.junit.Test;
public class AutoForwardTest {
@Test
public void run() throws Exception {
Assert.assertEquals(0,
AutoForward.runApp(new String[0], (connectionString) -> {
AutoForward app = new AutoForward();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
}

Просмотреть файл

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>azure-servicebus-samples-topicclientquickstart</name>
<name>azure-servicebus-samples-queueclientquickstart</name>
<comment></comment>
<projects>
</projects>

Просмотреть файл

@ -7,13 +7,13 @@
"request": "launch",
"stopOnEntry": true,
"cwd": "${workspaceRoot}",
"startupClass": "com.microsoft.azure.servicebus.samples.topicclientquickstart.TopicClientQuickstart",
"startupClass": "com.microsoft.azure.servicebus.samples.queueclientquickstart.QueueClientQuickstart",
"externalConsole": true,
"jdkPath": "${env:JAVA_HOME}/bin",
"sourcePath": ["${workspaceRoot}/src/main/java"], // Indicates where your source (.java) files are
"classpath": ["${workspaceRoot}/target/classes"], // Indicates the location of your .class files
"classpath": ["${workspaceRoot}", "${workspaceRoot}/target/classes"], // Indicates the location of your .class files
"preLaunchTask": "verify",
"options": ["-jar","${workspaceRoot}/target/azure-servicebus-samples-topicclientquickstart-1.0.0-jar-with-dependencies.jar"], // Additional options to pass to the java executable
"options": ["-jar","${workspaceRoot}/target/azure-servicebus-samples-queueclientquickstart-1.0.0-jar-with-dependencies.jar"], // Additional options to pass to the java executable
"args": [] // Command line arguments to pass to the startup class
}
]

Просмотреть файл

@ -7,7 +7,8 @@
"taskName": "verify",
"command": "mvn -B verify",
"type": "shell",
"group": "build"
"group": "build",
"problemMatcher": []
},
{
"taskName": "build",

Просмотреть файл

@ -2,8 +2,8 @@
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>azure-servicebus-samples-topicclientquickstart</artifactId>
<name>azure-servicebus-samples-topicclientquickstart</name>
<artifactId>deadletterqueue</artifactId>
<name>deadletterqueue</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@ -17,7 +17,7 @@
<configuration>
<source>1.8</source>
<target>1.8</target>
<debug>true</debug>
<debug>true</debug>
<debuglevel>lines,vars,source</debuglevel>
</configuration>
</plugin>
@ -37,25 +37,26 @@
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.microsoft.azure.servicebus.samples.topicclientquickstart.TopicClientQuickstart</mainClass>
<mainClass>com.microsoft.azure.servicebus.samples.deadletterqueue.DeadletterQueue</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>1.0.0</version>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>[1.2.17,]</version>
</dependency>
<dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>[1.7.25,]</version>
@ -65,6 +66,16 @@
<artifactId>commons-cli</artifactId>
<version>[1.4,]</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>[2.8.2,]</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<groupId>com.microsoft.azure</groupId>
<version>1.0.0</version>

Просмотреть файл

@ -0,0 +1,33 @@
# Dead-Letter Queues
This sample shows how to move messages to the Dead-letter queue, how to retrieve
messages from it, and resubmit corrected message back into the main queue.
For setup instructions, please refer back to the main [README](../README.md) file.
## What is a Dead-Letter Queue?
All Service Bus Queues and Subscriptions have a secondary sub-queue, called the
*dead-letter queue* (DLQ).
This sub-queue does not need to be explicitly created and cannot be deleted or
otherwise managed independent of the main entity. The purpose of the Dead-Letter
Queue (DLQ) is accept and hold messages that cannot be delivered to any receiver
or messages that could not be processed. Read more about Dead-Letter Queues [in
the product documentation.][1]
## Sample Code
The sample implements two scenarios:
* Send a message and then retrierve and abandon the message until the maximum
delivery count is exhausted and the message is automatically dead-lettered.
* Send a set of messages, and explicitly dead-letter messages that do not match
a certain criterion and would therefore not be processed correctly. The messages
are then picked up from the dead-letter queue, are automatically corrected, and
resubmitted.
The sample code is further documented inline in the [DeadletterQueue.java](.\src\main\java\com\microsoft\azure\servicebus\samples\deadletterqueue\DeadletterQueue.java) file.
[1]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-dead-letter-queues

Просмотреть файл

@ -0,0 +1,292 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.microsoft.azure.servicebus.samples.deadletterqueue;
import com.google.gson.reflect.TypeToken;
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.google.gson.Gson;
import static java.nio.charset.StandardCharsets.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import org.apache.commons.cli.*;
public class DeadletterQueue {
static final Gson GSON = new Gson();
public void run(String connectionString) throws Exception {
CompletableFuture<Void> receiveTask;
CompletableFuture<Void> fixUpTask;
IMessageSender sendClient;
sendClient = ClientFactory.createMessageSenderFromConnectionStringBuilder(new ConnectionStringBuilder(connectionString, "BasicQueue"));
// max delivery-count scenario
this.sendMessagesAsync(sendClient, 1).join();
this.exceedMaxDelivery(connectionString, "BasicQueue").join();
// fix-up scenario
this.sendMessagesAsync(sendClient, Integer.MAX_VALUE);
receiveTask = this.receiveMessagesAsync(connectionString, "BasicQueue");
fixUpTask = this.PickUpAndFixDeadletters(connectionString, "BasicQueue", sendClient);
// wait for ENTER or 10 seconds elapsing
waitForEnter(10);
receiveTask.cancel(true);
fixUpTask.cancel(true);
CompletableFuture.allOf(
sendClient.closeAsync(),
receiveTask.exceptionally(t -> {if (t instanceof CancellationException) {return null;} throw new RuntimeException((Throwable) t);}),
fixUpTask.exceptionally(t -> {if (t instanceof CancellationException) {return null;} throw new RuntimeException((Throwable) t);})
).join();
}
CompletableFuture<Void> sendMessagesAsync(IMessageSender sendClient, int maxMessages) {
List<HashMap<String, String>> data =
GSON.fromJson(
"[" +
"{'name' = 'Einstein', 'firstName' = 'Albert'}," +
"{'name' = 'Heisenberg', 'firstName' = 'Werner'}," +
"{'name' = 'Curie', 'firstName' = 'Marie'}," +
"{'name' = 'Hawking', 'firstName' = 'Steven'}," +
"{'name' = 'Newton', 'firstName' = 'Isaac'}," +
"{'name' = 'Bohr', 'firstName' = 'Niels'}," +
"{'name' = 'Faraday', 'firstName' = 'Michael'}," +
"{'name' = 'Galilei', 'firstName' = 'Galileo'}," +
"{'name' = 'Kepler', 'firstName' = 'Johannes'}," +
"{'name' = 'Kopernikus', 'firstName' = 'Nikolaus'}" +
"]",
new TypeToken<List<HashMap<String, String>>>() {
}.getType());
List<CompletableFuture> tasks = new ArrayList<>();
for (int i = 0; i < Math.min(data.size(), maxMessages); i++) {
final String messageId = Integer.toString(i);
Message message = new Message(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8));
message.setContentType("application/json");
message.setLabel(i % 2 == 0 ? "Scientist" : "Physicist");
message.setMessageId(messageId);
message.setTimeToLive(Duration.ofMinutes(2));
System.out.printf("Message sending: Id = %s\n", message.getMessageId());
tasks.add(
sendClient.sendAsync(message).thenRunAsync(() -> {
System.out.printf("\tMessage acknowledged: Id = %s\n", message.getMessageId());
}));
}
return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
}
CompletableFuture<Void> exceedMaxDelivery(String connectionString, String queueName) throws Exception {
IMessageReceiver receiver = ClientFactory.createMessageReceiverFromConnectionStringBuilder(new ConnectionStringBuilder(connectionString, "BasicQueue"), ReceiveMode.PEEKLOCK);
while (true) {
IMessage msg = receiver.receive(Duration.ofSeconds(2));
if (msg != null) {
System.out.printf("Picked up message; DeliveryCount %d\n", msg.getDeliveryCount());
receiver.abandon(msg.getLockToken());
} else {
break;
}
}
receiver.close();
IMessageReceiver deadletterReceiver = ClientFactory.createMessageReceiverFromConnectionStringBuilder(new ConnectionStringBuilder(connectionString, "BasicQueue/$deadletterqueue"), ReceiveMode.PEEKLOCK);
while (true) {
IMessage msg = deadletterReceiver.receive(Duration.ofSeconds(2));
if (msg != null) {
System.out.printf("\nDeadletter message:\n");
if (msg.getProperties() != null) {
for (String prop : msg.getProperties().keySet()) {
System.out.printf("\t%s=%s\n", prop, msg.getProperties().get(prop));
}
}
deadletterReceiver.complete(msg.getLockToken());
} else {
break;
}
}
deadletterReceiver.close();
return CompletableFuture.completedFuture(null);
}
CompletableFuture receiveMessagesAsync(String connectionString, String queueName) throws Exception {
CompletableFuture running = new CompletableFuture();
QueueClient receiver = new QueueClient(new ConnectionStringBuilder(connectionString, "BasicQueue"), ReceiveMode.PEEKLOCK);
running.whenComplete((r, t) -> {
try {
receiver.close();
} catch (ServiceBusException e) {
System.out.printf(e.getMessage());
}
});
// register the RegisterMessageHandler callback
receiver.registerMessageHandler(
new IMessageHandler() {
// callback invoked when the message handler loop has obtained a message
public CompletableFuture<Void> onMessageAsync(IMessage message) {
// receives message is passed to callback
if (message.getLabel() != null &&
message.getContentType() != null &&
message.getLabel().contentEquals("Scientist") &&
message.getContentType().contentEquals("application/json")) {
byte[] body = message.getBody();
Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class);
System.out.printf(
"\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," +
"\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\", \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n",
message.getMessageId(),
message.getSequenceNumber(),
message.getEnqueuedTimeUtc(),
message.getExpiresAtUtc(),
message.getContentType(),
scientist != null ? scientist.get("firstName") : "",
scientist != null ? scientist.get("name") : "");
} else {
return receiver.deadLetterAsync(message.getLockToken());
}
return receiver.completeAsync(message.getLockToken());
}
// callback invoked when the message handler has an exception to report
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
System.out.printf(exceptionPhase + "-" + throwable.getMessage());
}
},
// 1 concurrent call, messages are auto-completed, auto-renew duration
new MessageHandlerOptions(1, false, Duration.ofMinutes(1)));
return running;
}
CompletableFuture PickUpAndFixDeadletters(String connectionString, String queueName, IMessageSender resubmitSender) throws Exception {
CompletableFuture running = new CompletableFuture();
QueueClient receiver = new QueueClient(new ConnectionStringBuilder(connectionString, "BasicQueue/$deadletterqueue"), ReceiveMode.PEEKLOCK);
running.whenComplete((r, t) -> {
try {
receiver.close();
} catch (ServiceBusException e) {
System.out.printf(e.getMessage());
}
});
// register the RegisterMessageHandler callback
receiver.registerMessageHandler(
new IMessageHandler() {
// callback invoked when the message handler loop has obtained a message
public CompletableFuture<Void> onMessageAsync(IMessage message) {
try {
IMessage resubmitMessage = new Message(message.getBody());
if (message.getLabel() != null && message.getLabel().contentEquals("Physicist")) {
System.out.printf(
"\n\t\tFixing: \n\t\t\tMessageId = %s, \n\t\t\tSequenceNumber = %s, \n\t\t\tLabel = %s\n",
message.getMessageId(),
message.getSequenceNumber(),
message.getLabel());
resubmitMessage.setMessageId(message.getMessageId());
resubmitMessage.setLabel("Scientist");
resubmitMessage.setContentType(message.getContentType());
resubmitMessage.setTimeToLive(Duration.ofMinutes(2));
resubmitSender.send(resubmitMessage);
}
return receiver.completeAsync(message.getLockToken());
} catch (Exception e) {
CompletableFuture failure = new CompletableFuture();
failure.completeExceptionally(e);
return failure;
}
}
// callback invoked when the message handler has an exception to report
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
System.out.printf(exceptionPhase + "-" + throwable.getMessage());
}
},
// 1 concurrent call, messages are auto-completed, auto-renew duration
new MessageHandlerOptions(1, false, Duration.ofMinutes(1)));
return running;
}
public static void main(String[] args) {
System.exit(runApp(args, (connectionString) -> {
DeadletterQueue app = new DeadletterQueue();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";
public static int runApp(String[] args, Function<String, Integer> run) {
try {
String connectionString = null;
// parse connection string from command line
Options options = new Options();
options.addOption(new Option("c", true, "Connection string"));
CommandLineParser clp = new DefaultParser();
CommandLine cl = clp.parse(options, args);
if (cl.getOptionValue("c") != null) {
connectionString = cl.getOptionValue("c");
}
// get overrides from the environment
String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
if (env != null) {
connectionString = env;
}
if (connectionString == null) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("run jar with", "", options, "", true);
return 2;
}
return run.apply(connectionString);
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 3;
}
}
private void waitForEnter(int seconds) {
ExecutorService executor = Executors.newCachedThreadPool();
try {
executor.invokeAny(Arrays.asList(() -> {
System.in.read();
return 0;
}, () -> {
Thread.sleep(seconds * 1000);
return 0;
}));
} catch (Exception e) {
// absorb
}
}
}

Просмотреть файл

@ -1,6 +1,6 @@
log4j.rootLogger=INFO, stdout
log4j.logger.org.apache.qpid.jms=ALL
log4j.rootLogger=ERROR, stdout
log4j.logger.org.apache.qpid.jms=ERROR
# CONSOLE appender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender

Просмотреть файл

@ -0,0 +1,21 @@
package com.microsoft.azure.servicebus.samples.deadletterqueue;
import org.junit.Assert;
public class DeadletterQueueTest {
@org.junit.Test
public void runApp() throws Exception {
Assert.assertEquals(0,
DeadletterQueue.runApp(new String[0], (connectionString) -> {
DeadletterQueue app = new DeadletterQueue();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
}

Просмотреть файл

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>azure-servicebus-samples-messagereceiverquickstart</name>
<name>azure-servicebus-samples-queueclientquickstart</name>
<comment></comment>
<projects>
</projects>

Просмотреть файл

@ -7,13 +7,13 @@
"request": "launch",
"stopOnEntry": true,
"cwd": "${workspaceRoot}",
"startupClass": "com.microsoft.azure.servicebus.samples.messagereceiverquickstart.MessageReceiverQuickstart",
"startupClass": "com.microsoft.azure.servicebus.samples.queueclientquickstart.QueueClientQuickstart",
"externalConsole": true,
"jdkPath": "${env:JAVA_HOME}/bin",
"sourcePath": ["${workspaceRoot}/src/main/java"], // Indicates where your source (.java) files are
"classpath": ["${workspaceRoot}/target/classes"], // Indicates the location of your .class files
"classpath": ["${workspaceRoot}", "${workspaceRoot}/target/classes"], // Indicates the location of your .class files
"preLaunchTask": "verify",
"options": ["-jar","${workspaceRoot}/target/azure-servicebus-samples-messagereceiverquickstart-1.0.0-jar-with-dependencies.jar"], // Additional options to pass to the java executable
"options": ["-jar","${workspaceRoot}/target/azure-servicebus-samples-queueclientquickstart-1.0.0-jar-with-dependencies.jar"], // Additional options to pass to the java executable
"args": [] // Command line arguments to pass to the startup class
}
]

Просмотреть файл

@ -7,7 +7,8 @@
"taskName": "verify",
"command": "mvn -B verify",
"type": "shell",
"group": "build"
"group": "build",
"problemMatcher": []
},
{
"taskName": "build",

Просмотреть файл

@ -0,0 +1,87 @@
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>duplicatedetection</artifactId>
<name>duplicatedetection</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<debug>true</debug>
<debuglevel>lines,vars,source</debuglevel>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.microsoft.azure.servicebus.samples.queuesgettingstarted.QueuesGettingStarted</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>[1.2.17,]</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>[1.7.25,]</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>[1.4,]</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>[2.8.2,]</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>[23.0,]</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<groupId>com.microsoft.azure</groupId>
<version>1.0.0</version>
</project>

Просмотреть файл

@ -0,0 +1,54 @@
# Duplicate Detection
This sample illustrates the "duplicate detection" feature of Azure Service Bus.
The sample is specifically crafted to demonstrate the effect of duplicate
detection when enabled on a queue or topic. The default setting is for duplicate
detection to be turned off.
For setup instructions, please refer back to the main [README](../README.md) file.
## What is duplicate detection?
Enabling duplicate detection will keep track of the ```MessageId``` of all
messages sent into a queue or topic [during a defined time window][1].
If any new message is sent that carries a ```MessageId``` that has already been
logged during the time window, the message will be reported as being accepted
(the send operation succeeds), but the newly sent message will be instantly
ignored and dropped. No other parts of the message are considered.
[Read more about duplicate detection in the documentation.][2]
## Sample Code
The sample sends two messages that have the same ```MessageId``` and shows that
only one of those messages is being enqueued and retrievable, if the queue has
the duplicate-detection flag set.
The setup template creates the queue for this example by setting the
```requiresDuplicateDetection``` flag, which enables the feature, and it sets
the ```duplicateDetectionHistoryTimeWindow``` to 10 minutes.
``` JSON
{
"apiVersion": "[variables('apiVersion')]",
"name": "DupdetectQueue",
"type": "queues",
"dependsOn": [
"[concat('Microsoft.ServiceBus/namespaces/',
parameters('serviceBusNamespaceName'))]",
],
"properties": {
"requiresDuplicateDetection": true,
"duplicateDetectionHistoryTimeWindow" : "T10M"
},
"resources": []
},
```
The sample is further documented inline in the [DuplicateDetection.java](.\src\main\java\com\microsoft\azure\servicebus\samples\duplicatedetection\DuplicateDetection.java) file.
[1]: https://docs.microsoft.com/azure/service-bus-messaging/duplicate-detection#enable-duplicate-detection
[2]: https://docs.microsoft.com/azure/service-bus-messaging/duplicate-detection

Просмотреть файл

@ -0,0 +1,115 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.microsoft.azure.servicebus.samples.duplicatedetection;
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import java.time.Duration;
import java.util.*;
import java.util.function.Function;
import org.apache.commons.cli.*;
public class DuplicateDetection {
void run(String connectionString) throws Exception {
send(connectionString);
receive(connectionString);
}
void send(String connectionString) throws Exception {
IMessageSender sender = ClientFactory.createMessageSenderFromConnectionStringBuilder(new ConnectionStringBuilder(connectionString, "DupdetectQueue"));
String messageId = UUID.randomUUID().toString();
// Send messages to queue
System.out.printf("\tSending messages to %s ...\n", sender.getEntityPath());
IMessage message = new Message();
message.setMessageId(messageId);
message.setTimeToLive(Duration.ofMinutes(1));
sender.send(message);
System.out.printf("\t=> Sent a message with messageId %s\n", message.getMessageId());
IMessage message2 = new Message();
message2.setMessageId(messageId);
message2.setTimeToLive(Duration.ofMinutes(1));
sender.send(message2);
System.out.printf("\t=> Sent a duplicate message with messageId %s\n", message.getMessageId());
sender.close();
}
void receive(String connectionString) throws Exception {
IMessageReceiver receiver = ClientFactory.createMessageReceiverFromConnectionStringBuilder(new ConnectionStringBuilder(connectionString, "DupdetectQueue"), ReceiveMode.PEEKLOCK);
// receive messages from queue
String receivedMessageId = "";
System.out.printf("\n\tWaiting up to 5 seconds for messages from %s ...\n", receiver.getEntityPath());
while (true) {
IMessage receivedMessage = receiver.receive(Duration.ofSeconds(5));
if (receivedMessage == null) {
break;
}
System.out.printf("\t<= Received a message with messageId %s\n", receivedMessage.getMessageId());
receiver.complete(receivedMessage.getLockToken());
if (receivedMessageId.contentEquals(receivedMessage.getMessageId())) {
throw new Exception("Received a duplicate message!");
}
receivedMessageId = receivedMessage.getMessageId();
}
System.out.printf("\tDone receiving messages from %s\n", receiver.getEntityPath());
receiver.close();
}
public static void main(String[] args) {
System.exit(runApp(args, (connectionString) -> {
DuplicateDetection app = new DuplicateDetection();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";
public static int runApp(String[] args, Function<String, Integer> run) {
try {
String connectionString = null;
// parse connection string from command line
Options options = new Options();
options.addOption(new Option("c", true, "Connection string"));
CommandLineParser clp = new DefaultParser();
CommandLine cl = clp.parse(options, args);
if (cl.getOptionValue("c") != null) {
connectionString = cl.getOptionValue("c");
}
// get overrides from the environment
String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
if (env != null) {
connectionString = env;
}
if (connectionString == null) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("run jar with", "", options, "", true);
return 2;
}
return run.apply(connectionString);
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 3;
}
}
}

Просмотреть файл

@ -1,8 +1,6 @@
log4j.rootLogger=INFO, stdout
log4j.logger.org.apache.qpid.jms=ALL
log4j.rootLogger=ERROR, stdout
log4j.logger.org.apache.qpid.jms=ERROR
# CONSOLE appender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender

Просмотреть файл

@ -0,0 +1,22 @@
package com.microsoft.azure.servicebus.samples.duplicatedetection;
import org.junit.Assert;
public class DuplicateDetectionTest {
@org.junit.Test
public void runApp() throws Exception {
Assert.assertEquals(0,
DuplicateDetection.runApp(new String[0], (connectionString) -> {
DuplicateDetection app = new DuplicateDetection();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
}

Просмотреть файл

@ -1,49 +0,0 @@
# Managing Topic Rules
This sample demonstrates how to manage Topic subscription rules with the Azure Service Bus SDK for Java.
Expanding from the [TopicClientQuickstart](../TopicClientQuickstart) base sample, you will learn how to
modify the rules of existing subscriptions at runtime in order to change the conditions under which
messages are received from these subscriptions.
The sample demonstrates all available rule types:
* TrueFilter - matches all messages
* SqlFilter - matches messages with a message selector on message metadata
* SqlFilter with SqlAction - additionally performs a transform on message metadata
* CorrelationFilter - performs a simple metadata property match
Correlation filters yield significantly higher performance and therefore lower latency and
throughput on a Topic than SQL filters and are therefore preferred.
## Prerequisites
Please refer to the [overview README](../../readme.md) for prerequisites and setting up the samples
environment, including creating a Service Bus cloud namespace.
## Build and run
The sample can be built independently with
```bash
mvn clean package
```
and then run with (or just from VS Code or another Java IDE)
```bash
java -jar ./target/azure-servicebus-samples-managingtopicrules-1.0.0-jar-with-dependencies.jar
```
The sample accepts two arguments that can either be supplied on the command line or via environment
variables. The setup script discussed in the overview readme sets the environment variables for you.
* -c (env: SB_SAMPLES_CONNECTIONSTRING) - Service Bus connection string with credentials or
token granting send and listen rights for the namespace
* -t (env: SB_SAMPLES_TOPICNAME) - Name of an existing topic within the namespace
* -s (env: SB_SAMPLES_SUBSCRIPTIONNAME) - Name of an existing subscription on the given topic
## Sample Code
For a discussion of the sample code, review the inline comments in [ManagingTopicRules.java](./src/main/java/com/microsoft/azure/servicebus/samples/managingtopicrules/ManagingTopicRules.java)

Просмотреть файл

@ -1,213 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.microsoft.azure.servicebus.samples.managingtopicrules;
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.servicebus.rules.CorrelationFilter;
import com.microsoft.azure.servicebus.rules.RuleDescription;
import com.microsoft.azure.servicebus.rules.SqlFilter;
import com.microsoft.azure.servicebus.rules.SqlRuleAction;
import org.apache.commons.cli.*;
import org.apache.log4j.*;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
public class ManagingTopicRules {
// Connection String for the namespace can be obtained from the Azure portal under the
// 'Shared Access policies' section.
private static String connectionString = System.getenv("SB_SAMPLES_CONNECTIONSTRING");
// Name of an existing Topic within the Service Bus namespace
private static String topicName = System.getenv("SB_SAMPLES_TOPICNAME");
// The following 4 subscriptions witgh these exact names are expect to exist on
// the given topic:
private static final String allMessagesSubscriptionName = "allMessages";
private static final String sqlFilterOnlySubscriptionName = "sqlFilterOnly";
private static final String sqlFilterWithActionSubscriptionName = "sqlFilterWithAction";
private static final String correlationFilterSubscriptionName = "correlationFilter";
// topic client
private static ITopicClient topicClient;
// log4j logger
private static Logger logger = Logger.getRootLogger();
public static void main(String[] args) throws Exception {
if (!parseCommandLine(args)) {
return;
}
logger.info("Starting TopicSubscriptionWithRuleOperations sample.");
// create client
logger.info("Create topic client.");
topicClient = new TopicClient(new ConnectionStringBuilder(connectionString, topicName));
logger.info("Create subscription client.");
ISubscriptionClient allMessagessubscriptionClient = new SubscriptionClient(new ConnectionStringBuilder(
connectionString, topicName + "/subscriptions/" + allMessagesSubscriptionName), ReceiveMode.PEEKLOCK);
ISubscriptionClient sqlFilterOnlySubscriptionClient = new SubscriptionClient(new ConnectionStringBuilder(
connectionString, topicName + "/subscriptions/" + sqlFilterOnlySubscriptionName), ReceiveMode.PEEKLOCK);
ISubscriptionClient sqlFilterWithActionSubscriptionClient = new SubscriptionClient(
new ConnectionStringBuilder(connectionString,
topicName + "/subscriptions/" + sqlFilterWithActionSubscriptionName),
ReceiveMode.PEEKLOCK);
ISubscriptionClient correlationFilterSubscriptionClient = new SubscriptionClient(
new ConnectionStringBuilder(connectionString,
topicName + "/subscriptions/" + correlationFilterSubscriptionName),
ReceiveMode.PEEKLOCK);
// Drop existing rules and add a TrueFilter
for (RuleDescription rd : allMessagessubscriptionClient.getRules()) {
allMessagessubscriptionClientgetRules.removeRule(rd.getName());
}
allMessagessubscriptionClientgetRules.addRule(new RuleDescription("MatchAll", new TrueFilter()));
// Drop existing rules and add a SQL filter
for (RuleDescription rd : sqlFilterOnlySubscriptionClient.getRules()) {
sqlFilterOnlySubscriptionClient.removeRule(rd.getName());
}
sqlFilterOnlySubscriptionClient.addRule(new RuleDescription("RedSqlRule", new SqlFilter("Color = 'Red'")));
// Drop existing rules and add a SQL filter with a subsequent action
for (RuleDescription rd : sqlFilterWithActionSubscriptionClient.getRules()) {
sqlFilterWithActionSubscriptionClient.removeRule(rd.getName());
}
RuleDescription sqlRuleWithAction = new RuleDescription("BlueSqlRule", new SqlFilter("Color = 'Blue'"));
sqlRuleWithAction.setAction(new SqlRuleAction("SET Color = 'BlueProcessed'"));
sqlFilterWithActionSubscriptionClient.addRule(sqlRuleWithAction);
// Drop existing rules and add a Correlationfilter
logger.info(String.format("SubscriptionName: %s, Removing Default Rule and Adding CorrelationFilter",
sqlFilterWithActionSubscriptionName));
for (RuleDescription rd : correlationFilterSubscriptionClient.getRules()) {
correlationFilterSubscriptionClient.removeRule(rd.getName());
}
// this correlation filter
CorrelationFilter correlationFilter = new CorrelationFilter();
correlationFilter.setCorrelationId("important");
correlationFilter.setLabel("Red");
correlationFilterSubscriptionClient.addRule(new RuleDescription("ImportantCorrelationRule", correlationFilter));
// Get Rules on Subscription, called here only for one subscription as example
RuleDescription[] rules = correlationFilterSubscriptionClient.getRules().toArray(new RuleDescription[0]);
logger.info(String.format("GetRules:: SubscriptionName: %s, CorrelationFilter Name: %s, Rule: %s",
correlationFilterSubscriptionName, rules[0].getName(), rules[0].getFilter()));
// Send messages to Topic
sendMessages();
// Receive messages from 'allMessagesSubscriptionName'. Should receive all 9 messages
receiveMessages(allMessagesSubscriptionName);
// Receive messages from 'sqlFilterOnlySubscriptionName'. Should receive all messages with Color = 'Red' i.e 3 messages
receiveMessages(sqlFilterOnlySubscriptionName);
// Receive messages from 'sqlFilterWithActionSubscriptionClient'. Should receive all messages with Color = 'Blue'
// i.e 3 messages AND all messages should have color set to 'BlueProcessed'
receiveMessages(sqlFilterWithActionSubscriptionName);
// Receive messages from 'correlationFilterSubscriptionName'. Should receive all messages with Color = 'Red' and CorrelationId = "important"
// i.e 1 message
receiveMessages(correlationFilterSubscriptionName);
logger.info("Completed Receiving all messages...");
logger.info("=========================================================");
allMessagessubscriptionClient.close();
sqlFilterOnlySubscriptionClient.close();
sqlFilterWithActionSubscriptionClient.close();
correlationFilterSubscriptionClient.close();
topicClient.close();
}
private static void sendMessages() {
logger.info("Sending Messages to Topic");
try {
CompletableFuture.allOf(sendMessageAsync("Red", null), sendMessageAsync("Blue", null),
sendMessageAsync("Red", "important"), sendMessageAsync("Blue", "important"),
sendMessageAsync("Red", "notimportant"), sendMessageAsync("Blue", "notimportant"),
sendMessageAsync("Green", null), sendMessageAsync("Green", "important"),
sendMessageAsync("Green", "notimportant")).get();
} catch (Exception exception) {
logger.info(String.format("Exception: %s", exception.getMessage()));
}
}
/*
* Sends message with the subject, a custom property the correlation-Id property set
*/
private static CompletableFuture<Void> sendMessageAsync(String label, String correlationId)
throws ServiceBusException, InterruptedException {
// create a new message
Message message = new Message();
// set the label
message.setLabel(label);
// create a Hashmap for custom properties
Map<String, String> properties = new HashMap<>();
properties.put("Color", label);
// set the custom properties
message.setProperties(properties);
if (correlationId != null) {
message.setCorrelationId(correlationId);
}
// send the message async; when the send operation hads completed, log that fact
return topicClient.sendAsync(message)
.thenRunAsync(() -> logger.info(String.format("Sent Message:: Label: %s, CorrelationId: %s",
message.getLabel(), message.getCorrelationId() == null ? "" : message.getCorrelationId())));
}
/*
* receive sent messages from a given subscription
*/
private static void receiveMessages(String subscriptionName) throws ServiceBusException, InterruptedException {
IMessageReceiver subscriptionReceiver = ClientFactory.createMessageReceiverFromConnectionStringBuilder(
new ConnectionStringBuilder(connectionString, topicName + "/subscriptions/" + subscriptionName),
ReceiveMode.RECEIVEANDDELETE);
logger.info(String.format("Receiving Messages From Subscription: %s", subscriptionName));
int receivedMessageCount = 0;
while (true) {
IMessage receivedMessage = subscriptionReceiver.receive(Duration.ofSeconds(5));
if (receivedMessage != null) {
String colorProperty = receivedMessage.getProperties().get("Color");
logger.info(String.format("Color Property = %s, CorrelationId = %s", colorProperty,
receivedMessage.getCorrelationId()));
receivedMessageCount++;
} else {
break;
}
}
logger.info(
String.format("Received '%d' Messages From Subscription: %s", receivedMessageCount, subscriptionName));
subscriptionReceiver.close();
}
static boolean parseCommandLine(String[] args) throws Exception {
Options options = new Options();
options.addOption(new Option("c", true, "Connection string"));
options.addOption(new Option("t", true, "Topic name"));
CommandLineParser clp = new DefaultParser();
CommandLine cl = clp.parse(options, args);
if (cl.getOptionValue("c") != null) {
connectionString = cl.getOptionValue("c");
}
if (cl.getOptionValue("t") != null) {
topicName = cl.getOptionValue("t");
}
if (connectionString == null || topicName == null) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("run jar with", "", options, "", true);
return false;
}
return true;
}
}

Просмотреть файл

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>azure-servicebus-samples-queueclientquickstart</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>

20
samples/Java/azure-servicebus/MessageBrowse/.vscode/launch.json поставляемый Normal file
Просмотреть файл

@ -0,0 +1,20 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Java Console App",
"type": "java",
"request": "launch",
"stopOnEntry": true,
"cwd": "${workspaceRoot}",
"startupClass": "com.microsoft.azure.servicebus.samples.queueclientquickstart.QueueClientQuickstart",
"externalConsole": true,
"jdkPath": "${env:JAVA_HOME}/bin",
"sourcePath": ["${workspaceRoot}/src/main/java"], // Indicates where your source (.java) files are
"classpath": ["${workspaceRoot}", "${workspaceRoot}/target/classes"], // Indicates the location of your .class files
"preLaunchTask": "verify",
"options": ["-jar","${workspaceRoot}/target/azure-servicebus-samples-queueclientquickstart-1.0.0-jar-with-dependencies.jar"], // Additional options to pass to the java executable
"args": [] // Command line arguments to pass to the startup class
}
]
}

Просмотреть файл

@ -0,0 +1,3 @@
{
"java.configuration.updateBuildConfiguration": "automatic"
}

32
samples/Java/azure-servicebus/MessageBrowse/.vscode/tasks.json поставляемый Normal file
Просмотреть файл

@ -0,0 +1,32 @@
{
// See https://go.microsoft.com/fwlink/?LinkId=733558
// for the documentation about the tasks.json format
"version": "2.0.0",
"tasks": [
{
"taskName": "verify",
"command": "mvn -B verify",
"type": "shell",
"group": "build",
"problemMatcher": []
},
{
"taskName": "build",
"command": "mvn package",
"type": "shell",
"group": "build"
},
{
"taskName": "compile",
"command": "mvn compile",
"type": "shell",
"group": "build"
},
{
"taskName": "test",
"command": "mvn -B test",
"type": "shell",
"group": "test"
}
]
}

Просмотреть файл

@ -2,9 +2,8 @@
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<version>1.0.0</version>
<artifactId>azure-servicebus-samples-messagereceiverquickstart</artifactId>
<artifactId>messagebrowse</artifactId>
<name>messagebrowse</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@ -18,6 +17,8 @@
<configuration>
<source>1.8</source>
<target>1.8</target>
<debug>true</debug>
<debuglevel>lines,vars,source</debuglevel>
</configuration>
</plugin>
<plugin>
@ -36,33 +37,46 @@
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.microsoft.azure.servicebus.samples.messagereceiverquickstart.MessageReceiverQuickstart</mainClass>
<mainClass>com.microsoft.azure.servicebus.samples.queuesgettingstarted.QueuesGettingStarted</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependencies>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>1.0.0</version>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>[1.2.17,]</version>
</dependency>
<dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>[1.7.25,]</version>
</dependency>
<dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>[1.4,]</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>[2.8.2,]</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<groupId>com.microsoft.azure</groupId>
<version>1.0.0</version>
</project>

Просмотреть файл

@ -0,0 +1,26 @@
# Message Browsing (Peek)
This sample shows how to enumerate messages residing in a Queue or Topic
subscription without locking and/or deleting them. This feature is typically
used for diagnostic and troubleshooting purposes and/or for tooling built on top
of Service Bus.
[Read more about message browsing in the documentation.][1]
Refer to the main [README](../README.md) document for setup instructions.
## Sample Code
The sample sends a set of messages into a queue and then enumerates them. When
you run the sample repeatedly, you will see that messages accumulate in the log
as we don't receive and remove them.
You will also observe that expired messages (we send with a 2 minute
time-to-live setting) may hang around past their expiration time, because
Service Bus lazily cleans up expired messages no longer available for regular
retrieval.
The sample is documented inline in the [MessageBrowse.java](.\src\main\java\com\microsoft\azure\servicebus\samples\messagebrowse\MessageBrowse.java) file.
[1]: https://docs.microsoft.com/azure/service-bus-messaging/message-browsing

Просмотреть файл

@ -0,0 +1,199 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.microsoft.azure.servicebus.samples.messagebrowse;
import com.google.gson.reflect.TypeToken;
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.google.gson.Gson;
import static java.nio.charset.StandardCharsets.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;
import org.apache.commons.cli.*;
public class MessageBrowse {
static final Gson GSON = new Gson();
public void run(String connectionString) throws Exception {
QueueClient sendClient;
IMessageReceiver receiver;
CompletableFuture receiveTask;
// Create a QueueClient instance using the connection string builder
// We set the receive mode to "PeekLock", meaning the message is delivered
// under a lock and must be acknowledged ("completed") to be removed from the queue
sendClient = new QueueClient(
new ConnectionStringBuilder(connectionString, "BasicQueue"), ReceiveMode.PEEKLOCK);
this.sendMessagesAsync(sendClient).thenRunAsync(() -> sendClient.closeAsync());
receiver = ClientFactory.createMessageReceiverFromConnectionStringBuilder(
new ConnectionStringBuilder(connectionString, "BasicQueue"), ReceiveMode.PEEKLOCK);
receiveTask = this.peekMessagesAsync(receiver);
// wait for ENTER or 10 seconds elapsing
waitForEnter(10);
receiveTask.cancel(true);
CompletableFuture.allOf(
receiveTask.exceptionally(t -> {if (t instanceof CancellationException) { return null; } throw new RuntimeException((Throwable) t); }),
receiver.closeAsync()).join();
}
CompletableFuture<Void> sendMessagesAsync(QueueClient sendClient) {
List<HashMap<String, String>> data =
GSON.fromJson(
"[" +
"{'name' = 'Einstein', 'firstName' = 'Albert'}," +
"{'name' = 'Heisenberg', 'firstName' = 'Werner'}," +
"{'name' = 'Curie', 'firstName' = 'Marie'}," +
"{'name' = 'Hawking', 'firstName' = 'Steven'}," +
"{'name' = 'Newton', 'firstName' = 'Isaac'}," +
"{'name' = 'Bohr', 'firstName' = 'Niels'}," +
"{'name' = 'Faraday', 'firstName' = 'Michael'}," +
"{'name' = 'Galilei', 'firstName' = 'Galileo'}," +
"{'name' = 'Kepler', 'firstName' = 'Johannes'}," +
"{'name' = 'Kopernikus', 'firstName' = 'Nikolaus'}" +
"]",
new TypeToken<List<HashMap<String, String>>>() {
}.getType());
List<CompletableFuture> tasks = new ArrayList<>();
for (int i = 0; i < data.size(); i++) {
final String messageId = Integer.toString(i);
Message message = new Message(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8));
message.setContentType("application/json");
message.setLabel("Scientist");
message.setMessageId(messageId);
message.setTimeToLive(Duration.ofMinutes(2));
tasks.add(
sendClient.sendAsync(message).thenRunAsync(() -> {
System.out.printf("Message sent: Id = %s\n", message.getMessageId());
}));
}
return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
}
CompletableFuture peekMessagesAsync(IMessageReceiver receiver) {
CompletableFuture currentTask = new CompletableFuture();
try {
CompletableFuture.runAsync(() -> {
while (!currentTask.isCancelled()) {
try {
IMessage message = receiver.peek();
if (message != null) {
// receives message is passed to callback
if (message.getLabel() != null &&
message.getContentType() != null &&
message.getLabel().contentEquals("Scientist") &&
message.getContentType().contentEquals("application/json")) {
byte[] body = message.getBody();
Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class);
System.out.printf(
"\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," +
"\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\", \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n",
message.getMessageId(),
message.getSequenceNumber(),
message.getEnqueuedTimeUtc(),
message.getExpiresAtUtc(),
message.getContentType(),
scientist != null ? scientist.get("firstName") : "",
scientist != null ? scientist.get("name") : "");
} else {
currentTask.complete(null);
}
}
} catch (Exception e) {
currentTask.completeExceptionally(e);
}
}
if (!currentTask.isCancelled()) {
currentTask.complete(null);
}
});
return currentTask;
} catch (Exception e) {
currentTask.completeExceptionally(e);
}
return currentTask;
}
public static void main(String[] args) {
System.exit(runApp(args, (connectionString) -> {
MessageBrowse app = new MessageBrowse();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";
public static int runApp(String[] args, Function<String, Integer> run) {
try {
String connectionString = null;
// parse connection string from command line
Options options = new Options();
options.addOption(new Option("c", true, "Connection string"));
CommandLineParser clp = new DefaultParser();
CommandLine cl = clp.parse(options, args);
if (cl.getOptionValue("c") != null) {
connectionString = cl.getOptionValue("c");
}
// get overrides from the environment
String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
if (env != null) {
connectionString = env;
}
if (connectionString == null) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("run jar with", "", options, "", true);
return 2;
}
return run.apply(connectionString);
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 3;
}
}
private void waitForEnter(int seconds) {
ExecutorService executor = Executors.newCachedThreadPool();
try {
executor.invokeAny(Arrays.asList(() -> {
System.in.read();
return 0;
}, () -> {
Thread.sleep(seconds * 1000);
return 0;
}));
} catch (Exception e) {
// absorb
}
}
}

Просмотреть файл

@ -0,0 +1,8 @@
log4j.rootLogger=ERROR, stdout
log4j.logger.org.apache.qpid.jms=ERROR
# CONSOLE appender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n

Просмотреть файл

@ -0,0 +1,21 @@
package com.microsoft.azure.servicebus.samples.messagebrowse;
import org.junit.Assert;
public class MessageBrowseTest {
@org.junit.Test
public void runApp() throws Exception {
Assert.assertEquals(0,
MessageBrowse.runApp(new String[0], (connectionString) -> {
MessageBrowse app = new MessageBrowse();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
}

Просмотреть файл

@ -1,40 +0,0 @@
# Message Receiver Quickstart
This sample demonstrates how to use Azure Service Bus Queues with the Azure Service Bus SDK for Java.
You will learn how to set up a MessageSender and MessageReceiver, send messages, and receive those messages by explicitly pulling
from the queue with an asynchronous receive gesture. The callback model shown in the [QueueClientQuickstart](../QueueClientQuickstart)
sample is, however, the recommended method because the receive loop implemented by the SDK library
transparently handles common issues like occasional network issues or transient errors, and also allows
for parallel message handling on multiple worker threads.
## Prerequisites
Please refer to the [overview README](../../readme.md) for prerequisites and setting up the samples
environment, including creating a Service Bus cloud namespace.
## Build and run
The sample can be built independently with
```bash
mvn clean package
```
and then run with (or just from VS Code or another Java IDE)
```bash
java -jar ./target/azure-servicebus-samples-messagereceiverquickstart-1.0.0-jar-with-dependencies.jar
```
The sample accepts two arguments that can either be supplied on the command line or via environment
variables. The setup script discussed in the overview readme sets the environment variables for you.
* -c (env: SB_SAMPLES_CONNECTIONSTRING) - Service Bus connection string with credentials or
token granting send and listen rights for the namespace
* -q (env: SB_SAMPLES_QUEUENAME) - Name of an existing queue within the namespace
## Sample Code Explained
For a discussion of the sample code, review the inline comments in [MessageReceiverQuickstart.java](./src/main/java/com/microsoft/azure/servicebus/samples/messagereceiverquickstart/MessageReceiverQuickstart.java)

Просмотреть файл

@ -1,96 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.microsoft.azure.servicebus.samples.messagereceiverquickstart;
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import org.apache.commons.cli.*;
import org.apache.log4j.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
public class MessageReceiverQuickstart {
// connection string can be initialized via environment variable
private static String connectionString = System.getenv("SB_SAMPLES_CONNECTIONSTRING");
// queue name can be initialiyzed via environment variable
private static String queueName = System.getenv("SB_SAMPLES_QUEUENAME");
private static IMessageReceiver messageReceiver;
private static IMessageSender messageSender;
// message send/receive counters
private static int totalSend = 100;
private static AtomicInteger totalReceived = new AtomicInteger(0);
// log4j logger
private static Logger logger = Logger.getRootLogger();
public static void main(String[] args) throws Exception {
if ( !parseCommandLine(args) ) {
return;
}
logger.info("Starting SendReceiveWithMessageSenderReceiver sample.");
logger.info("Create message receiver.");
messageReceiver = ClientFactory.createMessageReceiverFromConnectionStringBuilder(new ConnectionStringBuilder(connectionString, queueName), ReceiveMode.PEEKLOCK);
logger.info("Create message sender.");
messageSender = ClientFactory.createMessageSenderFromConnectionStringBuilder(new ConnectionStringBuilder(connectionString, queueName));
send(messageSender);
receive(messageReceiver);
logger.info("Received all messages, exiting the sample.");
logger.info("Closing message receiver.");
messageReceiver.close();
logger.info("Closing message sender.");
messageSender.close();
}
static void send(IMessageSender sender) {
for (int i = 0; i < totalSend; i++) {
int currentMessageCounter = i;
logger.info(String.format("Sending message #%d.", currentMessageCounter));
sender.sendAsync(new Message("" + i)).thenRunAsync(() -> {
logger.info(String.format("Sent message #%d.", currentMessageCounter));
});
}
}
static void receive(IMessageReceiver receiver) throws InterruptedException, ExecutionException, ServiceBusException {
while (totalReceived.get() != totalSend) {
receiver.receiveAsync().thenAcceptAsync(m -> {
if (m != null) {
logger.info(String.format("Received message with sq#: %d and lock token: %s.", m.getSequenceNumber(), m.getLockToken()));
receiver.completeAsync(m.getLockToken()).thenRunAsync(() -> {
logger.info(String.format("Completed message %d sq#: %d and lock token: %s", totalReceived.incrementAndGet(), m.getSequenceNumber(), m.getLockToken()));
});
}
});
}
}
static boolean parseCommandLine(String[] args) throws Exception{
Options options = new Options();
options.addOption(new Option("c", true, "Connection string"));
options.addOption(new Option("t", true, "Queue name"));
CommandLineParser clp = new DefaultParser();
CommandLine cl = clp.parse(options, args);
if ( cl.getOptionValue("c") != null) {
connectionString = cl.getOptionValue("c");
}
if ( cl.getOptionValue("q") != null) {
queueName = cl.getOptionValue("q");
}
if ( connectionString == null || queueName == null) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("run jar with", "", options, "", true);
return false;
}
return true;
}
}

Просмотреть файл

@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>

Просмотреть файл

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>azure-servicebus-samples-queueclientquickstart</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>

Просмотреть файл

@ -0,0 +1,5 @@
{
"sourcePath": ["src"],
"classPathFile": "classpath.txt",
"outputDirectory": "target/classes"
}

Просмотреть файл

@ -0,0 +1,20 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Java Console App",
"type": "java",
"request": "launch",
"stopOnEntry": true,
"cwd": "${workspaceRoot}",
"startupClass": "com.microsoft.azure.servicebus.samples.queueclientquickstart.QueueClientQuickstart",
"externalConsole": true,
"jdkPath": "${env:JAVA_HOME}/bin",
"sourcePath": ["${workspaceRoot}/src/main/java"], // Indicates where your source (.java) files are
"classpath": ["${workspaceRoot}", "${workspaceRoot}/target/classes"], // Indicates the location of your .class files
"preLaunchTask": "verify",
"options": ["-jar","${workspaceRoot}/target/azure-servicebus-samples-queueclientquickstart-1.0.0-jar-with-dependencies.jar"], // Additional options to pass to the java executable
"args": [] // Command line arguments to pass to the startup class
}
]
}

Просмотреть файл

@ -0,0 +1,3 @@
{
"java.configuration.updateBuildConfiguration": "automatic"
}

Просмотреть файл

@ -0,0 +1,32 @@
{
// See https://go.microsoft.com/fwlink/?LinkId=733558
// for the documentation about the tasks.json format
"version": "2.0.0",
"tasks": [
{
"taskName": "verify",
"command": "mvn -B verify",
"type": "shell",
"group": "build",
"problemMatcher": []
},
{
"taskName": "build",
"command": "mvn package",
"type": "shell",
"group": "build"
},
{
"taskName": "compile",
"command": "mvn compile",
"type": "shell",
"group": "build"
},
{
"taskName": "test",
"command": "mvn -B test",
"type": "shell",
"group": "test"
}
]
}

Просмотреть файл

@ -0,0 +1,82 @@
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>partitionedqueues</artifactId>
<name>partitionedqueues</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<debug>true</debug>
<debuglevel>lines,vars,source</debuglevel>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.microsoft.azure.servicebus.samples.partitionedqueues.PartitionedQueues</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>[1.2.17,]</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>[1.7.25,]</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>[1.4,]</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>[2.8.2,]</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<groupId>com.microsoft.azure</groupId>
<version>1.0.0</version>
</project>

Просмотреть файл

@ -0,0 +1,17 @@
# Partitioned Queues
This sample illustrates the specifics of partitioned queues. Service Bus creates
"partitioned" queues by default, which means that the queue log is distributed across
multiple storage backends for minimizing availability risks. This behavior has impact on
the order in which messages will be retrieved, and it also has impact on the sequence
numbering scheme. This sample illustrates this.
[Read more on partitioning in the documentation][1].
Refer to the main [README](../README.md) document for setup instructions.
## Sample Code
The sample is documented inline in the [PartitionedQueues.java](.\src\main\java\com\microsoft\azure\servicebus\samples\partitionedqueues\PartitionedQueues.java) file.
[1]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-partitioning

Просмотреть файл

@ -0,0 +1,179 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.microsoft.azure.servicebus.samples.partitionedqueues;
import com.google.gson.reflect.TypeToken;
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.google.gson.Gson;
import static java.nio.charset.StandardCharsets.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;
import org.apache.commons.cli.*;
public class PartitionedQueues {
static final Gson GSON = new Gson();
public void run(String connectionString) throws Exception {
QueueClient sendClient;
QueueClient receiveClient;
// Create a QueueClient instance using the connection string builder
// We set the receive mode to "PeekLock", meaning the message is delivered
// under a lock and must be acknowledged ("completed") to be removed from the queue
receiveClient = new QueueClient(new ConnectionStringBuilder(connectionString, "PartitionedQueue"), ReceiveMode.PEEKLOCK);
this.registerMessageHandler(receiveClient);
sendClient = new QueueClient(new ConnectionStringBuilder(connectionString, "PartitionedQueue"), ReceiveMode.PEEKLOCK);
this.sendMessagesAsync(sendClient).thenRunAsync(()->sendClient.closeAsync());
// wait for ENTER or 10 seconds elapsing
waitForEnter(10);
receiveClient.close();
}
CompletableFuture<Void> sendMessagesAsync(QueueClient sendClient) {
List<HashMap<String, String>> data =
GSON.fromJson(
"[" +
"{'name' = 'Einstein', 'firstName' = 'Albert'}," +
"{'name' = 'Heisenberg', 'firstName' = 'Werner'}," +
"{'name' = 'Curie', 'firstName' = 'Marie'}," +
"{'name' = 'Hawking', 'firstName' = 'Steven'}," +
"{'name' = 'Newton', 'firstName' = 'Isaac'}," +
"{'name' = 'Bohr', 'firstName' = 'Niels'}," +
"{'name' = 'Faraday', 'firstName' = 'Michael'}," +
"{'name' = 'Galilei', 'firstName' = 'Galileo'}," +
"{'name' = 'Kepler', 'firstName' = 'Johannes'}," +
"{'name' = 'Kopernikus', 'firstName' = 'Nikolaus'}" +
"]",
new TypeToken<List<HashMap<String, String>>>() {}.getType());
List<CompletableFuture> tasks = new ArrayList<>();
for (int i = 0; i < data.size(); i++) {
final String messageId = Integer.toString(i);
Message message = new Message(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8));
message.setContentType("application/json");
message.setLabel("Scientist");
message.setMessageId(messageId);
message.setTimeToLive(Duration.ofMinutes(2));
message.setPartitionKey(data.get(i).get("name").substring(0, 1));
tasks.add(
sendClient.sendAsync(message).thenRunAsync(() -> {
System.out.printf("Message sent: Id = %s\n", message.getMessageId());
}));
}
return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
}
void registerMessageHandler(QueueClient receiveClient) throws Exception {
// register the RegisterMessageHandler callback
receiveClient.registerMessageHandler(new IMessageHandler() {
// callback invoked when the message handler loop has obtained a message
public CompletableFuture<Void> onMessageAsync(IMessage message) {
// receives message is passed to callback
if (message.getLabel() != null &&
message.getContentType() != null &&
message.getLabel().contentEquals("Scientist") &&
message.getContentType().contentEquals("application/json")) {
byte[] body = message.getBody();
Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class);
System.out.printf(
"\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %08X, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," +
"\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\", \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n",
message.getMessageId(),
message.getSequenceNumber(),
message.getEnqueuedTimeUtc(),
message.getExpiresAtUtc(),
message.getContentType(),
scientist != null ? scientist.get("firstName") : "",
scientist != null ? scientist.get("name") : "");
}
return receiveClient.completeAsync(message.getLockToken());
}
// callback invoked when the message handler has an exception to report
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
System.out.printf(exceptionPhase + "-" + throwable.getMessage());
}
},
// 1 concurrent call, messages are auto-completed, auto-renew duration
new MessageHandlerOptions(1, false, Duration.ofMinutes(1)));
}
public static void main(String[] args) {
System.exit(runApp(args, (connectionString) -> {
PartitionedQueues app = new PartitionedQueues();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";
public static int runApp(String[] args, Function<String, Integer> run) {
try {
String connectionString = null;
// parse connection string from command line
Options options = new Options();
options.addOption(new Option("c", true, "Connection string"));
CommandLineParser clp = new DefaultParser();
CommandLine cl = clp.parse(options, args);
if (cl.getOptionValue("c") != null) {
connectionString = cl.getOptionValue("c");
}
// get overrides from the environment
String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
if (env != null) {
connectionString = env;
}
if (connectionString == null) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("run jar with", "", options, "", true);
return 2;
}
return run.apply(connectionString);
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 3;
}
}
private void waitForEnter(int seconds) {
ExecutorService executor = Executors.newCachedThreadPool();
try {
executor.invokeAny(Arrays.asList(() -> {
System.in.read();
return 0;
}, () -> {
Thread.sleep(seconds * 1000);
return 0;
}));
} catch (Exception e) {
// absorb
}
}
}

Просмотреть файл

@ -0,0 +1,8 @@
log4j.rootLogger=ERROR, stdout
log4j.logger.org.apache.qpid.jms=ERROR
# CONSOLE appender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n

Просмотреть файл

@ -0,0 +1,21 @@
package com.microsoft.azure.servicebus.samples.partitionedqueues;
import org.junit.Assert;
public class PartitionedQueuesTest {
@org.junit.Test
public void runApp() throws Exception {
Assert.assertEquals(0,
PartitionedQueues.runApp(new String[0], (connectionString) -> {
PartitionedQueues app = new PartitionedQueues();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
}

Просмотреть файл

@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>

Просмотреть файл

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>azure-servicebus-samples-queueclientquickstart</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>

Просмотреть файл

@ -0,0 +1,5 @@
{
"sourcePath": ["src"],
"classPathFile": "classpath.txt",
"outputDirectory": "target/classes"
}

20
samples/Java/azure-servicebus/Prefetch/.vscode/launch.json поставляемый Normal file
Просмотреть файл

@ -0,0 +1,20 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Java Console App",
"type": "java",
"request": "launch",
"stopOnEntry": true,
"cwd": "${workspaceRoot}",
"startupClass": "com.microsoft.azure.servicebus.samples.queueclientquickstart.QueueClientQuickstart",
"externalConsole": true,
"jdkPath": "${env:JAVA_HOME}/bin",
"sourcePath": ["${workspaceRoot}/src/main/java"], // Indicates where your source (.java) files are
"classpath": ["${workspaceRoot}", "${workspaceRoot}/target/classes"], // Indicates the location of your .class files
"preLaunchTask": "verify",
"options": ["-jar","${workspaceRoot}/target/azure-servicebus-samples-queueclientquickstart-1.0.0-jar-with-dependencies.jar"], // Additional options to pass to the java executable
"args": [] // Command line arguments to pass to the startup class
}
]
}

3
samples/Java/azure-servicebus/Prefetch/.vscode/settings.json поставляемый Normal file
Просмотреть файл

@ -0,0 +1,3 @@
{
"java.configuration.updateBuildConfiguration": "automatic"
}

32
samples/Java/azure-servicebus/Prefetch/.vscode/tasks.json поставляемый Normal file
Просмотреть файл

@ -0,0 +1,32 @@
{
// See https://go.microsoft.com/fwlink/?LinkId=733558
// for the documentation about the tasks.json format
"version": "2.0.0",
"tasks": [
{
"taskName": "verify",
"command": "mvn -B verify",
"type": "shell",
"group": "build",
"problemMatcher": []
},
{
"taskName": "build",
"command": "mvn package",
"type": "shell",
"group": "build"
},
{
"taskName": "compile",
"command": "mvn compile",
"type": "shell",
"group": "build"
},
{
"taskName": "test",
"command": "mvn -B test",
"type": "shell",
"group": "test"
}
]
}

Просмотреть файл

@ -0,0 +1,87 @@
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>prefetch</artifactId>
<name>prefetch</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<debug>true</debug>
<debuglevel>lines,vars,source</debuglevel>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.microsoft.azure.servicebus.samples.queuesgettingstarted.QueuesGettingStarted</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>[1.2.17,]</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>[1.7.25,]</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>[1.4,]</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>[2.8.2,]</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>[23.0,]</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<groupId>com.microsoft.azure</groupId>
<version>1.0.0</version>
</project>

Просмотреть файл

@ -0,0 +1,21 @@
# Prefetch
This sample illustrates the the "prefetch" feature of the Service Bus client.
The sample is specifically crafted to demonstrate the throughput difference
between receiving messages with prefetch turned on and prefetch turned off. The
default setting is for prefetch to be turned off.
Refer to the main [README](../README.md) document for setup instructions.
[Read more about the prefetch feature in the documentation.][1]
## Sample Code
The sample performs two send and receive sequences, once with prefetch turned on
and once with prefetch turned off. You will observe that the variant with
prefetch turned on yields higher throughput, and therefore a shorter execution
time.
The sample is further documented inline in the [Prefetch.java](.\src\main\java\com\microsoft\azure\servicebus\samples\prefetch\Prefetch.java) file.
[1]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-prefetch

Просмотреть файл

@ -0,0 +1,145 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.microsoft.azure.servicebus.samples.prefetch;
import com.google.common.base.Stopwatch;
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;
import org.apache.commons.cli.*;
public class Prefetch {
public void run(String connectionString) throws Exception
{
IMessageSender sender;
IMessageReceiver receiver;
// Create communication objects to send and receive on the queue
sender = ClientFactory.createMessageSenderFromConnectionStringBuilder(new ConnectionStringBuilder(connectionString, "BasicQueue"));
// run 1
receiver = ClientFactory.createMessageReceiverFromConnectionStringBuilder(new ConnectionStringBuilder(connectionString, "BasicQueue"), ReceiveMode.PEEKLOCK);
receiver.setPrefetchCount(0);
// Send and Receive messages with prefetch OFF
long timeTaken1 = this.sendAndReceiveMessages(sender, receiver, 100);
receiver.close();
// run 2
receiver = ClientFactory.createMessageReceiverFromConnectionStringBuilder(new ConnectionStringBuilder(connectionString, "BasicQueue"), ReceiveMode.PEEKLOCK);
receiver.setPrefetchCount(50);
// Send and Receive messages with prefetch ON
long timeTaken2 = this.sendAndReceiveMessages(sender, receiver, 100);
receiver.close();
// Calculate the time difference
long timeDifference = timeTaken1 - timeTaken2;
System.out.printf("\nTime difference = %d milliseconds\n", timeDifference);
}
long sendAndReceiveMessages(IMessageSender sender, IMessageReceiver receiver, int messageCount) throws Exception
{
// Now we can start sending messages.
Random rnd = new Random();
byte[] mockPayload = new byte[100]; // 100 random-byte payload
rnd.nextBytes(mockPayload);
System.out.printf("\nSending %d messages to the queue\n", messageCount);
ArrayList<CompletableFuture<Void>> sendOps = new ArrayList<>();
for (int i = 0; i < messageCount; i++)
{
IMessage message = new Message(mockPayload);
message.setTimeToLive(Duration.ofMinutes(5));
sendOps.add(sender.sendAsync(message));
}
CompletableFuture.allOf(sendOps.toArray(new CompletableFuture<?>[sendOps.size()])).join();
System.out.printf("Send completed\n");
// Receive the messages
System.out.printf("Receiving messages...\n");
// Start stopwatch
Stopwatch stopWatch = Stopwatch.createStarted();
IMessage receivedMessage = receiver.receive(Duration.ofSeconds(5));
while (receivedMessage != null) {
// here's where you'd do any work
// complete (round trips)
receiver.complete(receivedMessage.getLockToken());
if (--messageCount <= 0)
break;
// now get the next message
receivedMessage = receiver.receive(Duration.ofSeconds(5));
}
// Stop the stopwatch
stopWatch.stop();
System.out.printf("Receive completed\n");
long timeTaken = stopWatch.elapsed(TimeUnit.MILLISECONDS);
System.out.printf("Time to receive and complete all messages = %d milliseconds\n", timeTaken);
return timeTaken;
}
public static void main(String[] args) {
System.exit(runApp(args, (connectionString) -> {
Prefetch app = new Prefetch();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";
public static int runApp(String[] args, Function<String, Integer> run) {
try {
String connectionString = null;
// parse connection string from command line
Options options = new Options();
options.addOption(new Option("c", true, "Connection string"));
CommandLineParser clp = new DefaultParser();
CommandLine cl = clp.parse(options, args);
if (cl.getOptionValue("c") != null) {
connectionString = cl.getOptionValue("c");
}
// get overrides from the environment
String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
if (env != null) {
connectionString = env;
}
if (connectionString == null) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("run jar with", "", options, "", true);
return 2;
}
return run.apply(connectionString);
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 3;
}
}
}

Просмотреть файл

@ -0,0 +1,8 @@
log4j.rootLogger=ERROR, stdout
log4j.logger.org.apache.qpid.jms=ERROR
# CONSOLE appender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n

Просмотреть файл

@ -0,0 +1,21 @@
package com.microsoft.azure.servicebus.samples.prefetch;
import org.junit.Assert;
public class PrefetchTest {
@org.junit.Test
public void runApp() throws Exception {
Assert.assertEquals(0,
Prefetch.runApp(new String[0], (connectionString) -> {
Prefetch app = new Prefetch();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
}

Просмотреть файл

@ -1,41 +0,0 @@
# Queue Client Quickstart
This sample demonstrates how to use Azure Service Bus Queues with the Azure Service Bus SDK for Java.
You will learn how to set up a QueueClient, send messages, and receive those messages into a callback
handler. The [MessageReceiverQuickstart](../MessageReceiverQuickstart) sample demonstrates how
to receive messages by explicitly pulling from the queue. The callback model shown in this sample
is the recommended method because the receive loop implemented by the SDK library transparently handles
common issues like occasional network issues or transient errors, and also allows for parallel
message handling on multiple worker threads.
## Prerequisites
Please refer to the [overview README](../../readme.md) for prerequisites and setting up the samples
environment, including creating a Service Bus cloud namespace.
## Build and run
The sample can be built independently with
```bash
mvn clean package
```
and then run with (or just from VS Code or another Java IDE)
```bash
java -jar ./target/azure-servicebus-samples-queueclientquickstart-1.0.0-jar-with-dependencies.jar
```
The sample accepts two arguments that can either be supplied on the command line or via environment
variables. The setup script discussed in the overview readme sets the environment variables for you.
* -c (env: SB_SAMPLES_CONNECTIONSTRING) - Service Bus connection string with credentials or
token granting send and listen rights for the namespace
* -q (env: SB_SAMPLES_QUEUENAME) - Name of an existing queue within the namespace
## Sample Code Explained
For a discussion of the sample code, review the inline comments in [QueueClientQuickstart.java](./src/main/java/com/microsoft/azure/servicebus/samples/queueclientquickstart/QueueClientQuickstart.java)

Просмотреть файл

@ -1,103 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.microsoft.azure.servicebus.samples.queueclientquickstart;
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import org.apache.commons.cli.*;
import org.apache.log4j.*;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
public class QueueClientQuickstart {
// Connection String for the namespace can be obtained from the Azure portal under the
// 'Shared Access policies' section.
// connection string can be initialized via environment variable
private static String connectionString = System.getenv("SB_SAMPLES_CONNECTIONSTRING");
// queue name can be initialiyzed via environment variable
private static String queueName = System.getenv("SB_SAMPLES_QUEUENAME");
// queue client instance
private static IQueueClient queueClient;
// message send/receive counters
private static int totalToSend = 10;
private static AtomicInteger totalReceived = new AtomicInteger(0);
// log4j logger
private static Logger logger = Logger.getRootLogger();
public static void main(String[] args) throws Exception {
// Parse and evaluate command line. Exit when there's not
// enough configuration information in environment and to
// command line to continue
if (!parseCommandLine(args)) {
return;
}
logger.info("Starting BasicSendReceiveWithQueueClient sample");
// Create a QueueClient instance using the connection string builder
// We set the receive mode to "PeekLock", meaning the message is delivered
// under a lock and must be acknowledged ("completed") to be removed from the queue
logger.info("Create queue client.");
queueClient = new QueueClient(new ConnectionStringBuilder(connectionString, queueName), ReceiveMode.PEEKLOCK);
// send messages in a loop
for (int i = 0; i < totalToSend; i++) {
int currentMessageCounter = i;
queueClient.sendAsync(new Message("" + i)).thenRunAsync(() -> {
logger.info(String.format("Sent message #%d.", currentMessageCounter));
});
}
// register the anonymous message handler which receives/handles the messages.
queueClient.registerMessageHandler(new IMessageHandler() {
// callback invoked when the message handler loop has obtained a message
public CompletableFuture<Void> onMessageAsync(IMessage message) {
// receives message is passed to callback
logger.info(String.format("Received message %d with sq#: %d and lock token: %s.",
totalReceived.incrementAndGet(), message.getSequenceNumber(), message.getLockToken()));
return CompletableFuture.completedFuture(null);
}
// callback invoked when the message handler has an exception to report
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
logger.error(exceptionPhase + "-" + throwable.getMessage());
}
},
// 1 concurrent call, messages are auto-completed, auto-renew duration
new MessageHandlerOptions(1, true, Duration.ofMinutes(1)));
// wait on the main thread until all sent messages have been received
while (totalReceived.get() < totalToSend) {
Thread.sleep(1000);
}
logger.info("Received all messages, exiting the sample.");
logger.info("Closing queue client.");
queueClient.close();
}
static boolean parseCommandLine(String[] args) throws Exception {
Options options = new Options();
options.addOption(new Option("c", true, "Connection string"));
options.addOption(new Option("q", true, "Queue name"));
CommandLineParser clp = new DefaultParser();
CommandLine cl = clp.parse(options, args);
if (cl.getOptionValue("c") != null) {
connectionString = cl.getOptionValue("c");
}
if (cl.getOptionValue("q") != null) {
queueName = cl.getOptionValue("q");
}
if (connectionString == null || queueName == null) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("run jar with", "", options, "", true);
return false;
}
return true;
}
}

Просмотреть файл

@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>

Просмотреть файл

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>azure-servicebus-samples-queueclientquickstart</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>

Просмотреть файл

@ -0,0 +1,5 @@
{
"sourcePath": ["src"],
"classPathFile": "classpath.txt",
"outputDirectory": "target/classes"
}

Просмотреть файл

@ -0,0 +1,20 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Java Console App",
"type": "java",
"request": "launch",
"stopOnEntry": true,
"cwd": "${workspaceRoot}",
"startupClass": "com.microsoft.azure.servicebus.samples.queueclientquickstart.QueueClientQuickstart",
"externalConsole": true,
"jdkPath": "${env:JAVA_HOME}/bin",
"sourcePath": ["${workspaceRoot}/src/main/java"], // Indicates where your source (.java) files are
"classpath": ["${workspaceRoot}", "${workspaceRoot}/target/classes"], // Indicates the location of your .class files
"preLaunchTask": "verify",
"options": ["-jar","${workspaceRoot}/target/azure-servicebus-samples-queueclientquickstart-1.0.0-jar-with-dependencies.jar"], // Additional options to pass to the java executable
"args": [] // Command line arguments to pass to the startup class
}
]
}

Просмотреть файл

@ -0,0 +1,3 @@
{
"java.configuration.updateBuildConfiguration": "automatic"
}

Просмотреть файл

@ -0,0 +1,32 @@
{
// See https://go.microsoft.com/fwlink/?LinkId=733558
// for the documentation about the tasks.json format
"version": "2.0.0",
"tasks": [
{
"taskName": "verify",
"command": "mvn -B verify",
"type": "shell",
"group": "build",
"problemMatcher": []
},
{
"taskName": "build",
"command": "mvn package",
"type": "shell",
"group": "build"
},
{
"taskName": "compile",
"command": "mvn compile",
"type": "shell",
"group": "build"
},
{
"taskName": "test",
"command": "mvn -B test",
"type": "shell",
"group": "test"
}
]
}

Просмотреть файл

@ -0,0 +1,82 @@
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>queuesgettingstarted</artifactId>
<name>queuesgettingstarted</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<debug>true</debug>
<debuglevel>lines,vars,source</debuglevel>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.microsoft.azure.servicebus.samples.queuesgettingstarted.QueuesGettingStarted</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>[1.2.17,]</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>[1.7.25,]</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>[1.4,]</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>[2.8.2,]</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<groupId>com.microsoft.azure</groupId>
<version>1.0.0</version>
</project>

Просмотреть файл

@ -0,0 +1,21 @@
# Getting Started with Service Bus Queues
This sample shows the essential API elements for interacting with messages and a
Service Bus Queue.
You will learn how to establish a connection, and to send and receive messages,
and you will learn about the most important properties of Service Bus messages.
Refer to the main [README](../README.md) document for setup instructions.
## Sample Code
The sample is documented inline in the [QueuesGettingStarted.java](.\src\main\java\com\microsoft\azure\servicebus\samples\queuesgettingstarted\QueuesGettingStarted.java) file.
To keep things reasonably simple, the sample program keeps message sender and
message receiver code within a single hosting application, even though these
roles are often spread across applications, services, or at least across
independently deployed and run tiers of applications or services. For clarity,
the send and receive activities are kept as separate as if they were different
apps and share no API object instances.

Просмотреть файл

@ -0,0 +1,179 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.microsoft.azure.servicebus.samples.queuesgettingstarted;
import com.google.gson.reflect.TypeToken;
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.google.gson.Gson;
import static java.nio.charset.StandardCharsets.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;
import org.apache.commons.cli.*;
public class QueuesGettingStarted {
static final Gson GSON = new Gson();
public void run(String connectionString) throws Exception {
// Create a QueueClient instance for receiving using the connection string builder
// We set the receive mode to "PeekLock", meaning the message is delivered
// under a lock and must be acknowledged ("completed") to be removed from the queue
QueueClient receiveClient = new QueueClient(new ConnectionStringBuilder(connectionString, "BasicQueue"), ReceiveMode.PEEKLOCK);
this.registerReceiver(receiveClient);
// Create a QueueClient instance for sending and then asynchronously send messages.
// Close the sender once the send operation is complete.
QueueClient sendClient = new QueueClient(new ConnectionStringBuilder(connectionString, "BasicQueue"), ReceiveMode.PEEKLOCK);
this.sendMessagesAsync(sendClient).thenRunAsync(() -> sendClient.closeAsync());
// wait for ENTER or 10 seconds elapsing
waitForEnter(10);
// shut down receiver to close the receive loop
receiveClient.close();
}
CompletableFuture<Void> sendMessagesAsync(QueueClient sendClient) {
List<HashMap<String, String>> data =
GSON.fromJson(
"[" +
"{'name' = 'Einstein', 'firstName' = 'Albert'}," +
"{'name' = 'Heisenberg', 'firstName' = 'Werner'}," +
"{'name' = 'Curie', 'firstName' = 'Marie'}," +
"{'name' = 'Hawking', 'firstName' = 'Steven'}," +
"{'name' = 'Newton', 'firstName' = 'Isaac'}," +
"{'name' = 'Bohr', 'firstName' = 'Niels'}," +
"{'name' = 'Faraday', 'firstName' = 'Michael'}," +
"{'name' = 'Galilei', 'firstName' = 'Galileo'}," +
"{'name' = 'Kepler', 'firstName' = 'Johannes'}," +
"{'name' = 'Kopernikus', 'firstName' = 'Nikolaus'}" +
"]",
new TypeToken<List<HashMap<String, String>>>() {}.getType());
List<CompletableFuture> tasks = new ArrayList<>();
for (int i = 0; i < data.size(); i++) {
final String messageId = Integer.toString(i);
Message message = new Message(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8));
message.setContentType("application/json");
message.setLabel("Scientist");
message.setMessageId(messageId);
message.setTimeToLive(Duration.ofMinutes(2));
System.out.printf("\nMessage sending: Id = %s", message.getMessageId());
tasks.add(
sendClient.sendAsync(message).thenRunAsync(() -> {
System.out.printf("\n\tMessage acknowledged: Id = %s", message.getMessageId());
}));
}
return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
}
void registerReceiver(QueueClient queueClient) throws Exception {
// register the RegisterMessageHandler callback
queueClient.registerMessageHandler(new IMessageHandler() {
// callback invoked when the message handler loop has obtained a message
public CompletableFuture<Void> onMessageAsync(IMessage message) {
// receives message is passed to callback
if (message.getLabel() != null &&
message.getContentType() != null &&
message.getLabel().contentEquals("Scientist") &&
message.getContentType().contentEquals("application/json")) {
byte[] body = message.getBody();
Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class);
System.out.printf(
"\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," +
"\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\", \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n",
message.getMessageId(),
message.getSequenceNumber(),
message.getEnqueuedTimeUtc(),
message.getExpiresAtUtc(),
message.getContentType(),
scientist != null ? scientist.get("firstName") : "",
scientist != null ? scientist.get("name") : "");
}
return CompletableFuture.completedFuture(null);
}
// callback invoked when the message handler has an exception to report
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
System.out.printf(exceptionPhase + "-" + throwable.getMessage());
}
},
// 1 concurrent call, messages are auto-completed, auto-renew duration
new MessageHandlerOptions(1, true, Duration.ofMinutes(1)));
}
public static void main(String[] args) {
System.exit(runApp(args, (connectionString) -> {
QueuesGettingStarted app = new QueuesGettingStarted();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";
public static int runApp(String[] args, Function<String, Integer> run) {
try {
String connectionString = null;
// parse connection string from command line
Options options = new Options();
options.addOption(new Option("c", true, "Connection string"));
CommandLineParser clp = new DefaultParser();
CommandLine cl = clp.parse(options, args);
if (cl.getOptionValue("c") != null) {
connectionString = cl.getOptionValue("c");
}
// get overrides from the environment
String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
if (env != null) {
connectionString = env;
}
if (connectionString == null) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("run jar with", "", options, "", true);
return 2;
}
return run.apply(connectionString);
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 3;
}
}
private void waitForEnter(int seconds) {
ExecutorService executor = Executors.newCachedThreadPool();
try {
executor.invokeAny(Arrays.asList(() -> {
System.in.read();
return 0;
}, () -> {
Thread.sleep(seconds * 1000);
return 0;
}));
} catch (Exception e) {
// absorb
}
}
}

Просмотреть файл

@ -0,0 +1,8 @@
log4j.rootLogger=ERROR, stdout
log4j.logger.org.apache.qpid.jms=ERROR
# CONSOLE appender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n

Просмотреть файл

@ -0,0 +1,22 @@
package com.microsoft.azure.servicebus.samples.queuesgettingstarted;
import org.junit.Assert;
public class QueuesGettingStartedTest {
@org.junit.Test
public void runApp() throws Exception {
Assert.assertEquals(0,
QueuesGettingStarted.runApp(new String[0], (connectionString) -> {
QueuesGettingStarted app = new QueuesGettingStarted();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
}

Просмотреть файл

@ -0,0 +1,146 @@
# Azure Service Bus .NET Framework samples
This repository contains the official set of samples for the Azure Service Bus service (Standard and Premium), illustrating all core
features of Service Bus Queues and Service Bus Topics. This samples all use the `WindowsAzure.ServiceBus` NuGet package for
the full .NET Framework.
# Setup
First, clone this git repository locally.
The samples require [creating an Azure subscription](https://azure.microsoft.com/free/) if you don't have one. You also need
a [Service Bus namespace](https://docs.microsoft.com/azure/service-bus-messaging/service-bus-fundamentals-hybrid-solutions),
and a simple basic topology of a few exemplary queues, topics, and subscriptions. To set those up,
with an Azure Service Bus "Standard" namespace, just click the button below and follow the further instructions
on the Azure Portal:
<a href="https://portal.azure.com/#create/Microsoft.Template/uri/https%3A%2F%2Fraw.githubusercontent.com%2Fclemensv%2Fazure-service-bus%2Fmaster%2Fsamples%2FDotNet%2FMicrosoft.ServiceBus.Messaging%2Fscripts%2Fazuredeploy.json" target="_blank">
<img src="http://azuredeploy.net/deploybutton.png"/>
</a>
The free Azure subscription offer includes a service credit that will take you very far with all your
experiments. The prorated [monthly base fee](https://azure.microsoft.com/pricing/details/service-bus/)
for Service Bus Standard includes a generous allocation of message operations, and you can even run a
large [Service Bus Premium namespace](https://docs.microsoft.com/azure/service-bus-messaging/service-bus-premium-messaging)
with 4 Messaging Units for several days.
You can also deploy the resource manager template from the command line:
## Setup using PowerShell
The PowerShell setup is functionally equivalent. You first [create a resource group](https://docs.microsoft.com/azure/azure-resource-manager/powershell-azure-resource-manager) and then [deploy the resource manager template](https://docs.microsoft.com/azure/azure-resource-manager/resource-group-template-deploy):
```powershell
New-AzureRmResourceGroup -Name {rg-name} -Location "Central US"
New-AzureRmResourceGroupDeployment \
-Name {deployment-name} \
-ResourceGroupName {rg-name}
-TemplateFile scripts/azuredeploy.json \
-serviceBusNamespaceName {service-bus-namespace-name}
```
## Exploring and running the samples
To make running the samples straightforward, there is a Powershell ([Azure PS](https://docs.microsoft.com/azure/azure-resource-manager/powershell-azure-resource-manager)) script
in *scripts* that will obtain the namespace connection string from your current Azure subscription, assume the entity names configured in the deployed templates,
and export those into a configuration file in your user directory, eliminating the need to pass arguments on the command line.
The Powershell script is *scripts/setupenv.ps1*. It needs to be called with the name of the Resource Group and the Service Bus namespace name as ordinal arguments.
Run the Powershell script from the scripts directory with
```bash
./setupenv.ps1 {rg-name} {service-bus-namespace-name}
```
## Common Considerations
Most samples use shared [entry-point boilerplate code](common/Main.cs) that loads the configuration and then launches the sample's
**Program.Run()** instance methods.
Except for the samples that explicitly demonstrate security capabilities, all samples are invoked with an externally issued SAS token
rather than a connection string or a raw SAS key. The security model design of Service Bus generally prefers clients to handle tokens
rather than keys, because tokens can be constrained to a particular scope and can be issued to expire at a certain time.
More about SAS and tokens can be found [here](https://azure.microsoft.com/documentation/articles/service-bus-shared-access-signature-authentication/).
All samples use the asynchronous, task-based programming model of the .NET Framework and therefore the *xAsync* overloads of the
respective Service Bus API methods. Since nearly all Service Bus operations result in network I/O, using the asynchronous programming
model is strongly encouraged at all times as it yields significantly more efficient execution at runtime.
> As you are exploring the samples, you should keep in mind that these samples are not aiming to show the simplest way to
> use the Service Bus API, but rather the **recommended**, most robust, and most efficient way to use the Service Bus API.
> The samples are therefore more explicit and take more lines of code than the simplest use of the API would. Distributed
> systems, and especially cloud systems, are dynamic environments and the samples reflect this reality.
## Samples
### Getting Started
* **Getting Started with Queues** - The [QueuesGettingStarted](./QueuesGettingStarted) sample illustrates the basic send and receive gestures
for interacting with a previously provisioned Service Bus Queue. Most other samples in this repository are derivatives of this basic sample.
* **Getting Started with Topics** - The [TopicsGettingStarted](./TopicsGettingStarted) sample illustrates the basic gestures for sending
messages into Topics and receiving them from Subscriptions.
### Message Handling
* **Senders and Receivers with Queues** - The [SendersReceiversWithQueues](./SendersReceiversWithQueues) sample shows how to use the
```MessagingFactory```for explicit connection management and the generic ```MessageSender``` and ```MessageReceiver``` abstractions with queues.
* **Senders and Receivers with Topics** - The [SendersReceiversWithTopics](./SendersReceiversWithTopics) sample is a variation of
the [SendersReceiversWithQueues](./SendersReceiversWithQueues) sample and shows how nearly identical code can be use with Queues and Topics
when using the ```MessageSender``` and ```MessageReceiver``` abstractions.
* **Receive Loop** - [ReceiveLoop](./ReceiveLoop) shows how to use an explicit receive loop with a queues instead of the
recommended, callback-based OnMessage(Async) API used in the "getting started" sample.
* **Message Prefetching** - The [Prefetch](./Prefetch) sample shows the difference between having "prefetch" turned on or off for the receiver.
Prefetch is a background receive operation that acquires messages into a buffer before the application itself calls *Receive* and therefore
optimizes and often accelerates the message flow.
* **Duplicate Detection** - The sample for [DuplicateDetection](./DuplicateDetection) illustrates how Service Bus suppresses the secound and all
further messages sent with an identical *MessageId* when sent during a defined duplicate detection time window when the *RequiresDuplicateDetection*
flag is turned on for a Queue or Topic.
* **Message Browsing** - [MessageBrowse](./MessageBrowse) shows how to enumerate all messages residing in a Queue or Subscription without receiving
or locking them. This method also allows finding deferred and scheduled messages.
* **Auto Forward** - [AutoForward](./AutoForward) illustrates how and why to use automatic forwarding between entities in Service Bus.
### Topics and Subscriptions
* **Topic Filters** - The [TopicFilters](./TopicFilters) sample illustrates how to create and configure filters on Topic Subscriptions.
* **Priority Subscriptions** - the sample [PrioritySubscriptions](./PrioritySubscriptions) shows how to model a "priority queue" pattern
with a Topic, with each priority tier having its own Topic Subscription.
### Partitioned Entities
* **Partitioned Queues** - [PartitionedQueues](./PartitionedQueues) are largely identical in handling to "regular" Queues (and are the default
option when creating new Queues via teh Azure Portal), but are more resilient against slowdowns in the backend storage system.
This sample illustrates some special considerations to keep in mind for partitioned queues.
### Error and Transaction Handling
* **Deadletter Queues** - The [DeadletterQueue](./DeadletterQueue) sample shows how to use the deadletter queue for setting aside
messages that cannot be processed, and how to receive from the deadletter queue to inspect, repair, and resubmit such messages.
* **Time To Live** - The [TimeToLive](./TimeToLive) example shows the basic functionality of the TimeToLive option for messages as
well as handling of the deadletter queue where messages can optionally be stored by the system as they expire.
* **Atomic Transactions** - Service Bus supports wrapping [AtomicTransactions](./AtomicTransactions) scopes around a range of
operations, allowing for such groups of operations to either jointly succeed or fail, enabling creating more robust business
applications in the cloud.
* **Durable Senders** - The [DurableSender](./DurableSender) sample shows how to make client applications robust against frequent
network link failures.
* **Geo Replication** - The [GeoReplication](./GeoReplication) sample illustrates how to route messages through two distinct
entities, possibly located in different namespaces in differentr datacenters, to limit the application's availability risk.
### Session and Workflow Management Features
* **Sessions** - The [Sessions](./Sessions) sample shows how to enforce strict ordered processing for messages originating from
a particular context, and how to multiplex multiple distinct contexts over a single Queue or a Subscription.
* **Deferral** - The [Deferral](./Deferral) sample shows how to postpone processing of received messages by deferral, which
allows pushing messages back into a Queue or Subscription so that they can be picked up directly as the processor is
ready to handle them.
* **Session State** - The [SessionState](./SessionState) sample shows how to keep track of processing a workflow using
the session state feature.
### Windows Communication Foundation (WCF) Binding
* **NetMessagingBinding** - The [NetMessagingBinding](./NetMessagingBinding) sample shows how to use Service Bus Queues
and Topics seamlessly the context of WCF applications using the NetMessagingBinding.
* **Sessions with the NetMessagingBinding** - The [NetMessagingSession](./NetMessagingSession) sample shows how to use Service Bus
sessions with the NetMessagingBinding.

Просмотреть файл

@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>

Просмотреть файл

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>azure-servicebus-samples-queueclientquickstart</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>

Просмотреть файл

@ -0,0 +1,5 @@
{
"sourcePath": ["src"],
"classPathFile": "classpath.txt",
"outputDirectory": "target/classes"
}

20
samples/Java/azure-servicebus/ReceiveLoop/.vscode/launch.json поставляемый Normal file
Просмотреть файл

@ -0,0 +1,20 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Java Console App",
"type": "java",
"request": "launch",
"stopOnEntry": true,
"cwd": "${workspaceRoot}",
"startupClass": "com.microsoft.azure.servicebus.samples.queueclientquickstart.QueueClientQuickstart",
"externalConsole": true,
"jdkPath": "${env:JAVA_HOME}/bin",
"sourcePath": ["${workspaceRoot}/src/main/java"], // Indicates where your source (.java) files are
"classpath": ["${workspaceRoot}", "${workspaceRoot}/target/classes"], // Indicates the location of your .class files
"preLaunchTask": "verify",
"options": ["-jar","${workspaceRoot}/target/azure-servicebus-samples-queueclientquickstart-1.0.0-jar-with-dependencies.jar"], // Additional options to pass to the java executable
"args": [] // Command line arguments to pass to the startup class
}
]
}

Просмотреть файл

@ -0,0 +1,3 @@
{
"java.configuration.updateBuildConfiguration": "automatic"
}

Некоторые файлы не были показаны из-за слишком большого количества измененных файлов Показать больше