Sample demonstrates async send, the benefits of concurrent sends, and how to limit the number of concurrent sends.
This commit is contained in:
JamesBirdsall 2020-04-22 10:06:23 -07:00 коммит произвёл GitHub
Родитель 1b66691ce2
Коммит 3527fe90a9
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 234 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,68 @@
# Send events asynchronously to Azure Event Hubs using Java
To run the sample, you need to edit the [sample code](src/main/java/com/microsoft/azure/eventhubs/samples/asyncsend/AsyncSend.java) and provide the following information:
```java
final String namespaceName = "----EventHubsNamespaceName-----";
final String eventHubName = "----EventHubName-----";
final String sasKeyName = "-----SharedAccessSignatureKeyName-----";
final String sasKey = "---SharedAccessSignatureKey----";
```
## Prerequisites
Please refer to the [overview README](../../readme.md) for prerequisites and setting up the sample environment, including creating an Event Hubs cloud namespace and an Event Hub.
## Build and run
The sample can be built independently with
```bash
mvn clean package
```
and then run with (or just from VS Code or another Java IDE)
```bash
java -jar ./target/asyncsend-1.0.0-jar-with-dependencies.jar
```
## Understanding Async Send
This sample demonstrates using the asynchronous send() method on the EventHubClient class.
The corresponding sendSync() method is pretty simple: it waits for the the send operation to
be complete, then returns, indicating success, or throws, indicating an error. Unfortunately, it's
not fast, because it means waiting for the message to go across the network to the service, for
the service to store the message, and then for the result to come back across the network.
Switching to the asynchronous send() means your code must be more
sophisticated. By its nature, an asynchronous method just starts an operation and does not wait
for it to finish, so your code can be doing something useful while the operation proceeds in the
background. However, that means that the method itself cannot tell
you anything about the result of the operation, because there is no result yet when the method
returns. Java provides a standard class, CompletableFuture, for asynchronous methods to return,
which allows the calling code to obtain the result of the operation at a later time, and the Event
Hubs Java client makes use of that class. CompletableFuture offers some sophisticated abilities,
but this sample simply retrieves the result by calling get(), which will return when the operation
finishes (or has already finished) successfully, or throw an exception if there was an error.
One of the things your code can be doing while waiting for a send operation to finish is to prepare
and send additional messages, which this sample demonstrates. This is known as concurrent sending
and it is a good way to increase your throughput. The sample sends three groups of 100 messages each,
with one concurrent send, ten concurrent sends, and thirty concurrent sends respectively. It uses a
queue to save the CompletableFuture instances returned by send(), and when the queue grows larger
than the desired number of concurrent operations, it waits for the oldest CompletableFuture to
complete. One concurrent send is equivalent to making a synchronous call: each send must wait for the
previous one to finish. With ten concurrent sends, the average wait time and total execution time is
much smaller, because by the time the 11th send is attempted, the first send is mostly done, and the
same for the 12th send and the second, etc. With thirty concurrent sends, the average wait time is
nearly 0, because the first send is already finished by the time the 31st is attempted, etc. The
useful number of concurrent calls is limited by memory usage, CPU usage, network congestion, and other
factors, and we do not recommend trying more than about 100.
Error handling is also different when using asynchronous calls. Supposing the get() on the CompletableFuture
throws an error, indicating a failure, but you still want that message and would like to retry sending
it -- how do you know what message failed? This sample demonstrates one possible approach, by storing
the original message with the CompletableFuture. There are many possibilities and which is best
depends on the structure of your application.

Просмотреть файл

@ -0,0 +1,53 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>asyncsend</groupId>
<version>1.0.0</version>
<artifactId>asyncsend</artifactId>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<debug>true</debug>
<debuglevel>lines,vars,source</debuglevel>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.microsoft.azure.eventhubs.samples.AsyncSend.AsyncSend</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>3.1.1</version>
</dependency>
</dependencies>
</project>

Просмотреть файл

@ -0,0 +1,112 @@
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package com.microsoft.azure.eventhubs.samples.AsyncSend;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class AsyncSend {
public static void main(String[] args)
throws EventHubException, ExecutionException, InterruptedException, IOException {
final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
.setNamespaceName("Your Event Hubs namespace name") // to target National clouds - use .setEndpoint(URI)
.setEventHubName("Your event hub")
.setSasKeyName("Your policy name")
.setSasKey("Your primary SAS key");
// The Executor handles all asynchronous tasks and this is passed to the
// EventHubClient instance. This enables the user to segregate their thread
// pool based on the work load. This pool can then be shared across multiple
// EventHubClient instances.
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
// Each EventHubClient instance spins up a new TCP/SSL connection, which is
// expensive. It is always a best practice to reuse these instances.
final EventHubClient ehClient = EventHubClient.createFromConnectionStringSync(connStr.toString(), executorService);
send100Messages(ehClient, 1);
System.out.println();
send100Messages(ehClient, 10);
System.out.println();
send100Messages(ehClient, 30);
System.out.println();
ehClient.closeSync();
executorService.shutdown();
}
public static void send100Messages(final EventHubClient ehClient, final int inFlightMax) {
System.out.println("Sending 100 messages with " + inFlightMax + " concurrent sends");
LinkedList<MessageAndResult> inFlight = new LinkedList<MessageAndResult>();
long totalWait = 0L;
long totalStart = Instant.now().toEpochMilli();
for (int i = 0; i < 100; i++) {
String payload = "Message " + Integer.toString(i);
byte[] payloadBytes = payload.getBytes(Charset.defaultCharset());
MessageAndResult mar = new MessageAndResult(i, EventData.create(payloadBytes));
long startWait = Instant.now().toEpochMilli();
if (inFlight.size() >= inFlightMax) {
MessageAndResult oldest = inFlight.remove();
try {
oldest.result.get();
//System.out.println("Completed send of message " + oldest.messageNumber + ": succeeded");
} catch (InterruptedException | ExecutionException e) {
System.out.println("Completed send of message " + oldest.messageNumber + ": failed: " + e.toString());
}
}
long waitTime = Instant.now().toEpochMilli() - startWait;
totalWait += waitTime;
//System.out.println("Blocked time waiting to send (ms): " + waitTime);
mar.result = ehClient.send(mar.message);
//System.out.println("Started send of message " + mar.messageNumber);
inFlight.add(mar);
}
System.out.println("All sends started");
while (inFlight.size() > 0) {
MessageAndResult oldest = inFlight.remove();
try {
oldest.result.get();
//System.out.println("Completed send of message " + oldest.messageNumber + ": succeeded");
} catch (InterruptedException | ExecutionException e) {
System.out.println("Completed send of message " + oldest.messageNumber + ": failed: " + e.toString());
}
}
System.out.println("All sends completed, average blocked time (ms): " + (totalWait / 100L));
System.out.println("Total time to send 100 messages (ms): " + (Instant.now().toEpochMilli() - totalStart));
}
private static class MessageAndResult {
public final int messageNumber;
public final EventData message;
public CompletableFuture<Void> result;
public MessageAndResult(final int messageNumber, final EventData message) {
this.messageNumber = messageNumber;
this.message = message;
}
}
}

Просмотреть файл

@ -29,6 +29,7 @@
<module>Basic/AdvancedSendOptions</module>
<module>Basic/SendBatch</module>
<module>Basic/SimpleSend</module>
<module>Basic/AsyncSend</module>
<module>Basic/SimpleProxy</module>
<module>Benchmarks/AutoScaleOnIngress</module>
<module>Benchmarks/IngressBenchmark</module>