Add async send sample (#468)
Sample demonstrates async send, the benefits of concurrent sends, and how to limit the number of concurrent sends.
This commit is contained in:
Родитель
70cbe0fadf
Коммит
898ba19e26
|
@ -0,0 +1,68 @@
|
|||
# Send events asynchronously to Azure Event Hubs using Java
|
||||
|
||||
To run the sample, you need to edit the [sample code](src/main/java/com/microsoft/azure/eventhubs/samples/asyncsend/AsyncSend.java) and provide the following information:
|
||||
|
||||
```java
|
||||
final String namespaceName = "----EventHubsNamespaceName-----";
|
||||
final String eventHubName = "----EventHubName-----";
|
||||
final String sasKeyName = "-----SharedAccessSignatureKeyName-----";
|
||||
final String sasKey = "---SharedAccessSignatureKey----";
|
||||
```
|
||||
|
||||
## Prerequisites
|
||||
|
||||
Please refer to the [overview README](../../readme.md) for prerequisites and setting up the sample environment, including creating an Event Hubs cloud namespace and an Event Hub.
|
||||
|
||||
## Build and run
|
||||
|
||||
The sample can be built independently with
|
||||
|
||||
```bash
|
||||
mvn clean package
|
||||
```
|
||||
|
||||
and then run with (or just from VS Code or another Java IDE)
|
||||
|
||||
```bash
|
||||
java -jar ./target/asyncsend-1.0.0-jar-with-dependencies.jar
|
||||
```
|
||||
|
||||
## Understanding Async Send
|
||||
|
||||
This sample demonstrates using the asynchronous send() method on the EventHubClient class.
|
||||
|
||||
The corresponding sendSync() method is pretty simple: it waits for the the send operation to
|
||||
be complete, then returns, indicating success, or throws, indicating an error. Unfortunately, it's
|
||||
not fast, because it means waiting for the message to go across the network to the service, for
|
||||
the service to store the message, and then for the result to come back across the network.
|
||||
|
||||
Switching to the asynchronous send() means your code must be more
|
||||
sophisticated. By its nature, an asynchronous method just starts an operation and does not wait
|
||||
for it to finish, so your code can be doing something useful while the operation proceeds in the
|
||||
background. However, that means that the method itself cannot tell
|
||||
you anything about the result of the operation, because there is no result yet when the method
|
||||
returns. Java provides a standard class, CompletableFuture, for asynchronous methods to return,
|
||||
which allows the calling code to obtain the result of the operation at a later time, and the Event
|
||||
Hubs Java client makes use of that class. CompletableFuture offers some sophisticated abilities,
|
||||
but this sample simply retrieves the result by calling get(), which will return when the operation
|
||||
finishes (or has already finished) successfully, or throw an exception if there was an error.
|
||||
|
||||
One of the things your code can be doing while waiting for a send operation to finish is to prepare
|
||||
and send additional messages, which this sample demonstrates. This is known as concurrent sending
|
||||
and it is a good way to increase your throughput. The sample sends three groups of 100 messages each,
|
||||
with one concurrent send, ten concurrent sends, and thirty concurrent sends respectively. It uses a
|
||||
queue to save the CompletableFuture instances returned by send(), and when the queue grows larger
|
||||
than the desired number of concurrent operations, it waits for the oldest CompletableFuture to
|
||||
complete. One concurrent send is equivalent to making a synchronous call: each send must wait for the
|
||||
previous one to finish. With ten concurrent sends, the average wait time and total execution time is
|
||||
much smaller, because by the time the 11th send is attempted, the first send is mostly done, and the
|
||||
same for the 12th send and the second, etc. With thirty concurrent sends, the average wait time is
|
||||
nearly 0, because the first send is already finished by the time the 31st is attempted, etc. The
|
||||
useful number of concurrent calls is limited by memory usage, CPU usage, network congestion, and other
|
||||
factors, and we do not recommend trying more than about 100.
|
||||
|
||||
Error handling is also different when using asynchronous calls. Supposing the get() on the CompletableFuture
|
||||
throws an error, indicating a failure, but you still want that message and would like to retry sending
|
||||
it -- how do you know what message failed? This sample demonstrates one possible approach, by storing
|
||||
the original message with the CompletableFuture. There are many possibilities and which is best
|
||||
depends on the structure of your application.
|
|
@ -0,0 +1,53 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>asyncsend</groupId>
|
||||
<version>1.0.0</version>
|
||||
<artifactId>asyncsend</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.6.1</version>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
<debug>true</debug>
|
||||
<debuglevel>lines,vars,source</debuglevel>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<descriptorRefs>
|
||||
<descriptorRef>jar-with-dependencies</descriptorRef>
|
||||
</descriptorRefs>
|
||||
<archive>
|
||||
<manifest>
|
||||
<mainClass>com.microsoft.azure.eventhubs.samples.AsyncSend.AsyncSend</mainClass>
|
||||
</manifest>
|
||||
</archive>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.microsoft.azure</groupId>
|
||||
<artifactId>azure-eventhubs</artifactId>
|
||||
<version>3.1.1</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,112 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft. All rights reserved.
|
||||
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
*/
|
||||
package com.microsoft.azure.eventhubs.samples.AsyncSend;
|
||||
|
||||
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
|
||||
import com.microsoft.azure.eventhubs.EventData;
|
||||
import com.microsoft.azure.eventhubs.EventHubClient;
|
||||
import com.microsoft.azure.eventhubs.EventHubException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.time.Instant;
|
||||
import java.util.LinkedList;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
public class AsyncSend {
|
||||
|
||||
public static void main(String[] args)
|
||||
throws EventHubException, ExecutionException, InterruptedException, IOException {
|
||||
|
||||
final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
|
||||
.setNamespaceName("Your Event Hubs namespace name") // to target National clouds - use .setEndpoint(URI)
|
||||
.setEventHubName("Your event hub")
|
||||
.setSasKeyName("Your policy name")
|
||||
.setSasKey("Your primary SAS key");
|
||||
|
||||
// The Executor handles all asynchronous tasks and this is passed to the
|
||||
// EventHubClient instance. This enables the user to segregate their thread
|
||||
// pool based on the work load. This pool can then be shared across multiple
|
||||
// EventHubClient instances.
|
||||
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
|
||||
|
||||
// Each EventHubClient instance spins up a new TCP/SSL connection, which is
|
||||
// expensive. It is always a best practice to reuse these instances.
|
||||
final EventHubClient ehClient = EventHubClient.createFromConnectionStringSync(connStr.toString(), executorService);
|
||||
|
||||
send100Messages(ehClient, 1);
|
||||
System.out.println();
|
||||
|
||||
send100Messages(ehClient, 10);
|
||||
System.out.println();
|
||||
|
||||
send100Messages(ehClient, 30);
|
||||
System.out.println();
|
||||
|
||||
ehClient.closeSync();
|
||||
executorService.shutdown();
|
||||
}
|
||||
|
||||
public static void send100Messages(final EventHubClient ehClient, final int inFlightMax) {
|
||||
System.out.println("Sending 100 messages with " + inFlightMax + " concurrent sends");
|
||||
|
||||
LinkedList<MessageAndResult> inFlight = new LinkedList<MessageAndResult>();
|
||||
long totalWait = 0L;
|
||||
long totalStart = Instant.now().toEpochMilli();
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
String payload = "Message " + Integer.toString(i);
|
||||
byte[] payloadBytes = payload.getBytes(Charset.defaultCharset());
|
||||
MessageAndResult mar = new MessageAndResult(i, EventData.create(payloadBytes));
|
||||
long startWait = Instant.now().toEpochMilli();
|
||||
|
||||
if (inFlight.size() >= inFlightMax) {
|
||||
MessageAndResult oldest = inFlight.remove();
|
||||
try {
|
||||
oldest.result.get();
|
||||
//System.out.println("Completed send of message " + oldest.messageNumber + ": succeeded");
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
System.out.println("Completed send of message " + oldest.messageNumber + ": failed: " + e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
long waitTime = Instant.now().toEpochMilli() - startWait;
|
||||
totalWait += waitTime;
|
||||
//System.out.println("Blocked time waiting to send (ms): " + waitTime);
|
||||
mar.result = ehClient.send(mar.message);
|
||||
//System.out.println("Started send of message " + mar.messageNumber);
|
||||
inFlight.add(mar);
|
||||
}
|
||||
|
||||
System.out.println("All sends started");
|
||||
|
||||
while (inFlight.size() > 0) {
|
||||
MessageAndResult oldest = inFlight.remove();
|
||||
try {
|
||||
oldest.result.get();
|
||||
//System.out.println("Completed send of message " + oldest.messageNumber + ": succeeded");
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
System.out.println("Completed send of message " + oldest.messageNumber + ": failed: " + e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println("All sends completed, average blocked time (ms): " + (totalWait / 100L));
|
||||
System.out.println("Total time to send 100 messages (ms): " + (Instant.now().toEpochMilli() - totalStart));
|
||||
}
|
||||
|
||||
private static class MessageAndResult {
|
||||
public final int messageNumber;
|
||||
public final EventData message;
|
||||
public CompletableFuture<Void> result;
|
||||
|
||||
public MessageAndResult(final int messageNumber, final EventData message) {
|
||||
this.messageNumber = messageNumber;
|
||||
this.message = message;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -29,6 +29,7 @@
|
|||
<module>Basic/AdvancedSendOptions</module>
|
||||
<module>Basic/SendBatch</module>
|
||||
<module>Basic/SimpleSend</module>
|
||||
<module>Basic/AsyncSend</module>
|
||||
<module>Basic/SimpleProxy</module>
|
||||
<module>Benchmarks/AutoScaleOnIngress</module>
|
||||
<module>Benchmarks/IngressBenchmark</module>
|
||||
|
|
Загрузка…
Ссылка в новой задаче