diff --git a/samples/Java/Basic/AsyncSend/README.md b/samples/Java/Basic/AsyncSend/README.md new file mode 100644 index 00000000..2cd77923 --- /dev/null +++ b/samples/Java/Basic/AsyncSend/README.md @@ -0,0 +1,68 @@ +# Send events asynchronously to Azure Event Hubs using Java + +To run the sample, you need to edit the [sample code](src/main/java/com/microsoft/azure/eventhubs/samples/asyncsend/AsyncSend.java) and provide the following information: + +```java + final String namespaceName = "----EventHubsNamespaceName-----"; + final String eventHubName = "----EventHubName-----"; + final String sasKeyName = "-----SharedAccessSignatureKeyName-----"; + final String sasKey = "---SharedAccessSignatureKey----"; +``` + +## Prerequisites + +Please refer to the [overview README](../../readme.md) for prerequisites and setting up the sample environment, including creating an Event Hubs cloud namespace and an Event Hub. + +## Build and run + +The sample can be built independently with + +```bash +mvn clean package +``` + +and then run with (or just from VS Code or another Java IDE) + +```bash +java -jar ./target/asyncsend-1.0.0-jar-with-dependencies.jar +``` + +## Understanding Async Send + +This sample demonstrates using the asynchronous send() method on the EventHubClient class. + +The corresponding sendSync() method is pretty simple: it waits for the the send operation to +be complete, then returns, indicating success, or throws, indicating an error. Unfortunately, it's +not fast, because it means waiting for the message to go across the network to the service, for +the service to store the message, and then for the result to come back across the network. + +Switching to the asynchronous send() means your code must be more +sophisticated. By its nature, an asynchronous method just starts an operation and does not wait +for it to finish, so your code can be doing something useful while the operation proceeds in the +background. However, that means that the method itself cannot tell +you anything about the result of the operation, because there is no result yet when the method +returns. Java provides a standard class, CompletableFuture, for asynchronous methods to return, +which allows the calling code to obtain the result of the operation at a later time, and the Event +Hubs Java client makes use of that class. CompletableFuture offers some sophisticated abilities, +but this sample simply retrieves the result by calling get(), which will return when the operation +finishes (or has already finished) successfully, or throw an exception if there was an error. + +One of the things your code can be doing while waiting for a send operation to finish is to prepare +and send additional messages, which this sample demonstrates. This is known as concurrent sending +and it is a good way to increase your throughput. The sample sends three groups of 100 messages each, +with one concurrent send, ten concurrent sends, and thirty concurrent sends respectively. It uses a +queue to save the CompletableFuture instances returned by send(), and when the queue grows larger +than the desired number of concurrent operations, it waits for the oldest CompletableFuture to +complete. One concurrent send is equivalent to making a synchronous call: each send must wait for the +previous one to finish. With ten concurrent sends, the average wait time and total execution time is +much smaller, because by the time the 11th send is attempted, the first send is mostly done, and the +same for the 12th send and the second, etc. With thirty concurrent sends, the average wait time is +nearly 0, because the first send is already finished by the time the 31st is attempted, etc. The +useful number of concurrent calls is limited by memory usage, CPU usage, network congestion, and other +factors, and we do not recommend trying more than about 100. + +Error handling is also different when using asynchronous calls. Supposing the get() on the CompletableFuture +throws an error, indicating a failure, but you still want that message and would like to retry sending +it -- how do you know what message failed? This sample demonstrates one possible approach, by storing +the original message with the CompletableFuture. There are many possibilities and which is best +depends on the structure of your application. \ No newline at end of file diff --git a/samples/Java/Basic/AsyncSend/pom.xml b/samples/Java/Basic/AsyncSend/pom.xml new file mode 100644 index 00000000..2b976050 --- /dev/null +++ b/samples/Java/Basic/AsyncSend/pom.xml @@ -0,0 +1,53 @@ + + 4.0.0 + asyncsend + 1.0.0 + asyncsend + jar + + UTF-8 + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.1 + + 1.8 + 1.8 + true + lines,vars,source + + + + maven-assembly-plugin + + + package + + single + + + + + + jar-with-dependencies + + + + com.microsoft.azure.eventhubs.samples.AsyncSend.AsyncSend + + + + + + + + + com.microsoft.azure + azure-eventhubs + 3.1.1 + + + diff --git a/samples/Java/Basic/AsyncSend/src/main/java/com/microsoft/azure/eventhubs/samples/AsyncSend/AsyncSend.java b/samples/Java/Basic/AsyncSend/src/main/java/com/microsoft/azure/eventhubs/samples/AsyncSend/AsyncSend.java new file mode 100644 index 00000000..38bfe47e --- /dev/null +++ b/samples/Java/Basic/AsyncSend/src/main/java/com/microsoft/azure/eventhubs/samples/AsyncSend/AsyncSend.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ +package com.microsoft.azure.eventhubs.samples.AsyncSend; + +import com.microsoft.azure.eventhubs.ConnectionStringBuilder; +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.EventHubException; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.time.Instant; +import java.util.LinkedList; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +public class AsyncSend { + + public static void main(String[] args) + throws EventHubException, ExecutionException, InterruptedException, IOException { + + final ConnectionStringBuilder connStr = new ConnectionStringBuilder() + .setNamespaceName("Your Event Hubs namespace name") // to target National clouds - use .setEndpoint(URI) + .setEventHubName("Your event hub") + .setSasKeyName("Your policy name") + .setSasKey("Your primary SAS key"); + + // The Executor handles all asynchronous tasks and this is passed to the + // EventHubClient instance. This enables the user to segregate their thread + // pool based on the work load. This pool can then be shared across multiple + // EventHubClient instances. + final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); + + // Each EventHubClient instance spins up a new TCP/SSL connection, which is + // expensive. It is always a best practice to reuse these instances. + final EventHubClient ehClient = EventHubClient.createFromConnectionStringSync(connStr.toString(), executorService); + + send100Messages(ehClient, 1); + System.out.println(); + + send100Messages(ehClient, 10); + System.out.println(); + + send100Messages(ehClient, 30); + System.out.println(); + + ehClient.closeSync(); + executorService.shutdown(); + } + + public static void send100Messages(final EventHubClient ehClient, final int inFlightMax) { + System.out.println("Sending 100 messages with " + inFlightMax + " concurrent sends"); + + LinkedList inFlight = new LinkedList(); + long totalWait = 0L; + long totalStart = Instant.now().toEpochMilli(); + + for (int i = 0; i < 100; i++) { + String payload = "Message " + Integer.toString(i); + byte[] payloadBytes = payload.getBytes(Charset.defaultCharset()); + MessageAndResult mar = new MessageAndResult(i, EventData.create(payloadBytes)); + long startWait = Instant.now().toEpochMilli(); + + if (inFlight.size() >= inFlightMax) { + MessageAndResult oldest = inFlight.remove(); + try { + oldest.result.get(); + //System.out.println("Completed send of message " + oldest.messageNumber + ": succeeded"); + } catch (InterruptedException | ExecutionException e) { + System.out.println("Completed send of message " + oldest.messageNumber + ": failed: " + e.toString()); + } + } + + long waitTime = Instant.now().toEpochMilli() - startWait; + totalWait += waitTime; + //System.out.println("Blocked time waiting to send (ms): " + waitTime); + mar.result = ehClient.send(mar.message); + //System.out.println("Started send of message " + mar.messageNumber); + inFlight.add(mar); + } + + System.out.println("All sends started"); + + while (inFlight.size() > 0) { + MessageAndResult oldest = inFlight.remove(); + try { + oldest.result.get(); + //System.out.println("Completed send of message " + oldest.messageNumber + ": succeeded"); + } catch (InterruptedException | ExecutionException e) { + System.out.println("Completed send of message " + oldest.messageNumber + ": failed: " + e.toString()); + } + } + + System.out.println("All sends completed, average blocked time (ms): " + (totalWait / 100L)); + System.out.println("Total time to send 100 messages (ms): " + (Instant.now().toEpochMilli() - totalStart)); + } + + private static class MessageAndResult { + public final int messageNumber; + public final EventData message; + public CompletableFuture result; + + public MessageAndResult(final int messageNumber, final EventData message) { + this.messageNumber = messageNumber; + this.message = message; + } + } +} diff --git a/samples/Java/pom.xml b/samples/Java/pom.xml index 8dceb9e1..dec4a357 100644 --- a/samples/Java/pom.xml +++ b/samples/Java/pom.xml @@ -29,6 +29,7 @@ Basic/AdvancedSendOptions Basic/SendBatch Basic/SimpleSend + Basic/AsyncSend Basic/SimpleProxy Benchmarks/AutoScaleOnIngress Benchmarks/IngressBenchmark