From 87c2c6c3d7a2d2ed4f29f6af8f9d7d782b6a636a Mon Sep 17 00:00:00 2001 From: Basil Hariri Date: Fri, 27 Jul 2018 14:20:35 -0700 Subject: [PATCH] Quickstart dependency update, README update, and producer clean up --- samples/kafka/quickstart/README.md | 4 ++- samples/kafka/quickstart/consumer/pom.xml | 4 +-- samples/kafka/quickstart/producer/pom.xml | 4 +-- .../com/example/app/TestDataReporter.java | 26 ++++++++++++------- 4 files changed, 23 insertions(+), 15 deletions(-) diff --git a/samples/kafka/quickstart/README.md b/samples/kafka/quickstart/README.md index 5bc179f7..9958cd29 100644 --- a/samples/kafka/quickstart/README.md +++ b/samples/kafka/quickstart/README.md @@ -87,4 +87,6 @@ mvn clean package mvn exec:java -Dexec.mainClass="TestConsumer" ``` -If the Kafka-enabled Event Hub has queued events (for instance, if your producer is also running), then the consumer should now begin receiving events from topic `test`. If you would like to change the topic, change the TOPIC constant in `producer/src/main/java/com/example/app/TestConsumer.java`. +If the Kafka-enabled Event Hub has incoming events (for instance, if your example producer is also running), then the consumer should now begin receiving events from topic `test`. If you would like to change the topic, change the TOPIC constant in `producer/src/main/java/com/example/app/TestConsumer.java`. + +By default, Kafka consumers will read from the end of the stream rather than the beginning. This means any events queued before you begin running your consumer will not be read. If you started your consumer but it isn't receiving any events, try running your producer again while your consumer is polling. Alternatively, you can use Kafka's [`auto.offset.reset` consumer config](https://kafka.apache.org/documentation/#newconsumerconfigs) to make your consumer read from the beginning of the stream! \ No newline at end of file diff --git a/samples/kafka/quickstart/consumer/pom.xml b/samples/kafka/quickstart/consumer/pom.xml index 6ba4fb77..e141592c 100644 --- a/samples/kafka/quickstart/consumer/pom.xml +++ b/samples/kafka/quickstart/consumer/pom.xml @@ -17,13 +17,13 @@ org.apache.kafka - kafka_2.12 + kafka-clients 1.0.0 diff --git a/samples/kafka/quickstart/producer/pom.xml b/samples/kafka/quickstart/producer/pom.xml index 229dafeb..2693e536 100644 --- a/samples/kafka/quickstart/producer/pom.xml +++ b/samples/kafka/quickstart/producer/pom.xml @@ -17,13 +17,13 @@ org.apache.kafka - kafka_2.12 + kafka-clients 1.0.0 diff --git a/samples/kafka/quickstart/producer/src/main/java/com/example/app/TestDataReporter.java b/samples/kafka/quickstart/producer/src/main/java/com/example/app/TestDataReporter.java index 596e292b..daa91d3b 100644 --- a/samples/kafka/quickstart/producer/src/main/java/com/example/app/TestDataReporter.java +++ b/samples/kafka/quickstart/producer/src/main/java/com/example/app/TestDataReporter.java @@ -6,7 +6,9 @@ import java.sql.Timestamp; public class TestDataReporter implements Runnable { + private static final int NUM_MESSAGES = 100; private final String TOPIC; + private Producer producer; public TestDataReporter(final Producer producer, String TOPIC) { @@ -16,16 +18,20 @@ public class TestDataReporter implements Runnable { @Override public void run() { - for(int i = 0; i < 1000; i++) { - long time = System.currentTimeMillis(); - System.out.println("Test Data #" + i + " from thread " + Thread.currentThread().getId()); - final ProducerRecord record = new ProducerRecord(TOPIC, time, "Test Data #" + i); - producer.send(record, new Callback() { - public void onCompletion(RecordMetadata metadata, Exception exception) { - if (exception != null) { - System.out.println(exception); - System.exit(1); - }}}); + for(int i = 0; i < NUM_MESSAGES; i++) { + long time = System.currentTimeMillis(); + System.out.println("Test Data #" + i + " from thread #" + Thread.currentThread().getId()); + + final ProducerRecord record = new ProducerRecord(TOPIC, time, "Test Data #" + i); + producer.send(record, new Callback() { + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + System.out.println(exception); + System.exit(1); + } + } + }); } + System.out.println("Finished sending " + NUM_MESSAGES + " messages from thread #" + Thread.currentThread.getId() + "!"); } } \ No newline at end of file