Merge pull request #414 from basilhariri/master
Quickstart dependency update, README update, and producer clean up
This commit is contained in:
Коммит
9831678e39
|
@ -87,4 +87,6 @@ mvn clean package
|
|||
mvn exec:java -Dexec.mainClass="TestConsumer"
|
||||
```
|
||||
|
||||
If the Kafka-enabled Event Hub has queued events (for instance, if your producer is also running), then the consumer should now begin receiving events from topic `test`. If you would like to change the topic, change the TOPIC constant in `producer/src/main/java/com/example/app/TestConsumer.java`.
|
||||
If the Kafka-enabled Event Hub has incoming events (for instance, if your example producer is also running), then the consumer should now begin receiving events from topic `test`. If you would like to change the topic, change the TOPIC constant in `producer/src/main/java/com/example/app/TestConsumer.java`.
|
||||
|
||||
By default, Kafka consumers will read from the end of the stream rather than the beginning. This means any events queued before you begin running your consumer will not be read. If you started your consumer but it isn't receiving any events, try running your producer again while your consumer is polling. Alternatively, you can use Kafka's [`auto.offset.reset` consumer config](https://kafka.apache.org/documentation/#newconsumerconfigs) to make your consumer read from the beginning of the stream!
|
|
@ -17,13 +17,13 @@
|
|||
<!--v1.0-->
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.12</artifactId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</dependency>
|
||||
<!--v1.1-->
|
||||
<!-- <dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.12</artifactId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>1.1.0</version>
|
||||
</dependency> -->
|
||||
</dependencies>
|
||||
|
|
|
@ -17,13 +17,13 @@
|
|||
<!--v1.0-->
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.12</artifactId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</dependency>
|
||||
<!--v1.1-->
|
||||
<!-- <dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.12</artifactId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>1.1.0</version>
|
||||
</dependency> -->
|
||||
</dependencies>
|
||||
|
|
|
@ -6,7 +6,9 @@ import java.sql.Timestamp;
|
|||
|
||||
public class TestDataReporter implements Runnable {
|
||||
|
||||
private static final int NUM_MESSAGES = 100;
|
||||
private final String TOPIC;
|
||||
|
||||
private Producer<Long, String> producer;
|
||||
|
||||
public TestDataReporter(final Producer<Long, String> producer, String TOPIC) {
|
||||
|
@ -16,16 +18,20 @@ public class TestDataReporter implements Runnable {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
for(int i = 0; i < 1000; i++) {
|
||||
for(int i = 0; i < NUM_MESSAGES; i++) {
|
||||
long time = System.currentTimeMillis();
|
||||
System.out.println("Test Data #" + i + " from thread " + Thread.currentThread().getId());
|
||||
System.out.println("Test Data #" + i + " from thread #" + Thread.currentThread().getId());
|
||||
|
||||
final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data #" + i);
|
||||
producer.send(record, new Callback() {
|
||||
public void onCompletion(RecordMetadata metadata, Exception exception) {
|
||||
if (exception != null) {
|
||||
System.out.println(exception);
|
||||
System.exit(1);
|
||||
}}});
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
System.out.println("Finished sending " + NUM_MESSAGES + " messages from thread #" + Thread.currentThread.getId() + "!");
|
||||
}
|
||||
}
|
Загрузка…
Ссылка в новой задаче