This commit is contained in:
Kasun Indrasiri 2022-11-11 11:30:19 -08:00
Родитель 60ad61c1cf
Коммит e1ce3c6931
8 изменённых файлов: 33 добавлений и 20 удалений

Просмотреть файл

@ -1,6 +1,8 @@
# Send and Receive Messages in Java using Azure Event Hubs for Apache Kafka Ecosystems
# Use Kafka Compaction with Azure Event Hubs
This quickstart will show how to create and connect to an Event Hubs Kafka endpoint using an example producer and consumer written in Java. Azure Event Hubs for Apache Kafka Ecosystems supports [Apache Kafka version 1.0](https://kafka.apache.org/10/documentation.html) and later.
This quickstart shows how you can use Kafka compaction with Azure Event Hubs. With log compaction feature of Event Hubs, you can use event key-based retention mechanism where Event Hubs retrains the last known value for each event key of an event hub or a Kafka topic.
In this quickstart, the example producer application publishes a series of events and then publishes updated events for the same set of keys. Therefore, once the compaction job for the event hub/topic completes, the consumer should only see the updated events.
## Prerequisites
@ -20,6 +22,10 @@ In addition:
An Event Hubs namespace is required to send or receive from any Event Hubs service. See [Create Kafka-enabled Event Hubs](https://docs.microsoft.com/azure/event-hubs/event-hubs-create-kafka-enabled) for instructions on getting an Event Hubs Kafka endpoint. Make sure to copy the Event Hubs connection string for later use.
## Create a compact event hub/Kafaka topic
You can create a new event hub inside the namespace that you created in the previous step. To create a event hubs/Kafka topic which has log compaction enabled, make sure you set the *compaction policy* as *compaction* and provide the desired value for *tombstone retention time*. See [Create an event hub
](https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-create#create-an-event-hub) for instruction on how to create an event hub using the Azure portal.
### FQDN
For these samples, you will need the connection string from the portal as well as the FQDN that points to your Event Hub namespace. **The FQDN can be found within your connection string as follows**:
@ -30,16 +36,19 @@ If your Event Hubs namespace is deployed on a non-Public cloud, your domain name
## Clone the example project
Now that you have a Kafka-enabled Event Hubs connection string, clone the Azure Event Hubs for Kafka repository and navigate to the `quickstart/java` subfolder:
Now that you have a Kafka-enabled Event Hubs connection string, clone the Azure Event Hubs for Kafka repository and navigate to the `compaction/java` subfolder:
```bash
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/quickstart/java
cd azure-event-hubs-for-kafka/tutorials/compaction/java
```
## Producer
Using the provided producer example, send messages to the Event Hubs service. To change the Kafka version, change the dependency in the pom file to the desired version.
Using the provided producer example, send messages to the Event Hubs service.
Producer application publishes 100 events(which has the event value prefixed with `V1-`) using the keys from 1 to 100. Then another set of updated events(which has the event value prefixed with `V2-`) for the keys from 1 to 50.
Therefore, once the Kafka topic is compacted, the consumer application should only see the updated events for keys from 1 to 50.
### Provide an Event Hubs Kafka endpoint
@ -56,7 +65,7 @@ sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule require
### Run producer from command line
This sample is configured to send messages to topic `test`, if you would like to change the topic, change the TOPIC constant in `producer/src/main/java/TestProducer.java`.
This sample is configured to send messages to topic `contoso-compacted`, if you would like to change the topic, change the TOPIC constant in `producer/src/main/java/TestProducer.java`.
To run the producer from the command line, generate the JAR and then run from within Maven (alternatively, generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath):
@ -65,11 +74,11 @@ mvn clean package
mvn exec:java -Dexec.mainClass="TestProducer"
```
The producer will now begin sending events to the Kafka-enabled Event Hub at topic `test` (or whatever topic you chose) and printing the events to stdout.
The producer will now begin sending events to the Kafka-enabled Event Hub at topic `contoso-compacted` (or whatever topic you chose) and printing the events to stdout.
## Consumer
Using the provided consumer example, receive messages from the Kafka-enabled Event Hubs. To change the Kafka version, change the dependency in the pom file to the desired version.
Before running the consumer, you should wait a few mins to so that the topic compaction job completes its execution. Then you can use the provided consumer example to receive messages from the Kafka API of Event Hubs. If the compaction job has sucessfully completed, you should only see the updated events (event payload/value with `V2-` prefix) for keys 1 to 50.
### Provide an Event Hubs Kafka endpoint
@ -86,7 +95,7 @@ sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule require
### Run consumer from command line
This sample is configured to receive messages from topic `test`, if you would like to change the topic, change the TOPIC constant in `consumer/src/main/java/TestConsumer.java`.
This sample is configured to receive messages from topic `contoso-compacted`, if you would like to change the topic, change the TOPIC constant in `consumer/src/main/java/TestConsumer.java`.
To run the producer from the command line, generate the JAR and then run from within Maven (alternatively, generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath):
@ -95,6 +104,6 @@ mvn clean package
mvn exec:java -Dexec.mainClass="TestConsumer"
```
If the Kafka-enabled Event Hub has incoming events (for instance, if your example producer is also running), then the consumer should now begin receiving events from topic `test` (or whatever topic you chose).
If the Kafka-enabled Event Hub has incoming events (for instance, if your example producer is also running), then the consumer should now begin receiving events from topic `contoso-compacted` (or whatever topic you chose).
By default, Kafka consumers will read from the end of the stream rather than the beginning. This means any events queued before you begin running your consumer will not be read. If you started your consumer but it isn't receiving any events, try running your producer again while your consumer is polling. Alternatively, you can use Kafka's [`auto.offset.reset` consumer config](https://kafka.apache.org/documentation/#newconsumerconfigs) to make your consumer read from the beginning of the stream!

Просмотреть файл

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.app</groupId>
<artifactId>event-hubs-kafka-java-consumer</artifactId>
<artifactId>event-hubs-kafka-java-consumer-for-compaction</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>

Просмотреть файл

@ -5,7 +5,7 @@ import java.util.concurrent.Executors;
public class TestConsumer {
//Change constant to send messages to the desired topic
private final static String TOPIC = "eh-p4";
private final static String TOPIC = "contoso-compacted";
private final static int NUM_THREADS = 1;

Просмотреть файл

@ -1,6 +1,6 @@
bootstrap.servers=myns:9093
bootstrap.servers=mynamespace.servicebus.windows.net:9093
group.id=$Default
request.timeout.ms=60000
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://myns/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxx";
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=XXXXXX;SharedAccessKey=XXXXXX";

Просмотреть файл

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.app</groupId>
<artifactId>event-hubs-kafka-java-producer</artifactId>
<artifactId>event-hubs-kafka-java-producer-for-compaction</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>

Просмотреть файл

@ -32,7 +32,9 @@ public class TestDataReporter implements Runnable {
for(int i = 0; i < 100; i++) {
System.out.println("Publishing event: Key-" + i );
final ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "Key-" + Integer.toString(i), "V1_" + Integer.toString(i) + largeDummyValue);
final ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC,
"Key-" + Integer.toString(i),
"V1_" + Integer.toString(i) + largeDummyValue);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
@ -45,7 +47,9 @@ public class TestDataReporter implements Runnable {
for(int i = 0; i < 50; i++) {
System.out.println("Publishing updated event: Key-" + i );
final ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "Key-" + Integer.toString(i), "V2_" + Integer.toString(i) + largeDummyValue);
final ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC,
"Key-" + Integer.toString(i),
"V3_" + Integer.toString(i) + largeDummyValue);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {

Просмотреть файл

@ -13,7 +13,7 @@ import java.util.concurrent.Executors;
public class TestProducer {
//Change constant to send messages to the desired topic, for this example we use 'test'
private final static String TOPIC = "eh-p4";
private final static String TOPIC = "contoso-compacted";
private final static int NUM_THREADS = 1;

Просмотреть файл

@ -1,4 +1,4 @@
bootstrap.servers=myns:9093
bootstrap.servers=mynamespace.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://myns/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxx";
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=XXXXXX;SharedAccessKey=XXXXXX";