diff --git a/samples/kafka/quickstart/README.md b/samples/kafka/quickstart/README.md
index 5bc179f7..9958cd29 100644
--- a/samples/kafka/quickstart/README.md
+++ b/samples/kafka/quickstart/README.md
@@ -87,4 +87,6 @@ mvn clean package
mvn exec:java -Dexec.mainClass="TestConsumer"
```
-If the Kafka-enabled Event Hub has queued events (for instance, if your producer is also running), then the consumer should now begin receiving events from topic `test`. If you would like to change the topic, change the TOPIC constant in `producer/src/main/java/com/example/app/TestConsumer.java`.
+If the Kafka-enabled Event Hub has incoming events (for instance, if your example producer is also running), then the consumer should now begin receiving events from topic `test`. If you would like to change the topic, change the TOPIC constant in `producer/src/main/java/com/example/app/TestConsumer.java`.
+
+By default, Kafka consumers will read from the end of the stream rather than the beginning. This means any events queued before you begin running your consumer will not be read. If you started your consumer but it isn't receiving any events, try running your producer again while your consumer is polling. Alternatively, you can use Kafka's [`auto.offset.reset` consumer config](https://kafka.apache.org/documentation/#newconsumerconfigs) to make your consumer read from the beginning of the stream!
\ No newline at end of file
diff --git a/samples/kafka/quickstart/consumer/pom.xml b/samples/kafka/quickstart/consumer/pom.xml
index 6ba4fb77..e141592c 100644
--- a/samples/kafka/quickstart/consumer/pom.xml
+++ b/samples/kafka/quickstart/consumer/pom.xml
@@ -17,13 +17,13 @@
org.apache.kafka
- kafka_2.12
+ kafka-clients
1.0.0
diff --git a/samples/kafka/quickstart/producer/pom.xml b/samples/kafka/quickstart/producer/pom.xml
index 229dafeb..2693e536 100644
--- a/samples/kafka/quickstart/producer/pom.xml
+++ b/samples/kafka/quickstart/producer/pom.xml
@@ -17,13 +17,13 @@
org.apache.kafka
- kafka_2.12
+ kafka-clients
1.0.0
diff --git a/samples/kafka/quickstart/producer/src/main/java/com/example/app/TestDataReporter.java b/samples/kafka/quickstart/producer/src/main/java/com/example/app/TestDataReporter.java
index 596e292b..daa91d3b 100644
--- a/samples/kafka/quickstart/producer/src/main/java/com/example/app/TestDataReporter.java
+++ b/samples/kafka/quickstart/producer/src/main/java/com/example/app/TestDataReporter.java
@@ -6,7 +6,9 @@ import java.sql.Timestamp;
public class TestDataReporter implements Runnable {
+ private static final int NUM_MESSAGES = 100;
private final String TOPIC;
+
private Producer producer;
public TestDataReporter(final Producer producer, String TOPIC) {
@@ -16,16 +18,20 @@ public class TestDataReporter implements Runnable {
@Override
public void run() {
- for(int i = 0; i < 1000; i++) {
- long time = System.currentTimeMillis();
- System.out.println("Test Data #" + i + " from thread " + Thread.currentThread().getId());
- final ProducerRecord record = new ProducerRecord(TOPIC, time, "Test Data #" + i);
- producer.send(record, new Callback() {
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception != null) {
- System.out.println(exception);
- System.exit(1);
- }}});
+ for(int i = 0; i < NUM_MESSAGES; i++) {
+ long time = System.currentTimeMillis();
+ System.out.println("Test Data #" + i + " from thread #" + Thread.currentThread().getId());
+
+ final ProducerRecord record = new ProducerRecord(TOPIC, time, "Test Data #" + i);
+ producer.send(record, new Callback() {
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null) {
+ System.out.println(exception);
+ System.exit(1);
+ }
+ }
+ });
}
+ System.out.println("Finished sending " + NUM_MESSAGES + " messages from thread #" + Thread.currentThread.getId() + "!");
}
}
\ No newline at end of file