diff --git a/samples/kafka/akka-scala/README.md b/samples/kafka/akka-scala/README.md
deleted file mode 100644
index 07d917ef..00000000
--- a/samples/kafka/akka-scala/README.md
+++ /dev/null
@@ -1,6 +0,0 @@
-# We've moved!
-
-All Event Hubs for Kafka samples and documentation have been moved to https://github.com/Azure/azure-event-hubs-for-kafka. All other samples/docs relating to native (i.e. non-Kafka) Event Hubs SDKs and clients will still live in this repo.
-
-The Event Hubs for Kafka samples/docs in this repo (i.e. this directory and its subdirectories) are outdated and will be removed soon.
-
diff --git a/samples/kafka/akka-scala/consumer/build.sbt b/samples/kafka/akka-scala/consumer/build.sbt
deleted file mode 100644
index c5942704..00000000
--- a/samples/kafka/akka-scala/consumer/build.sbt
+++ /dev/null
@@ -1,7 +0,0 @@
-name := "scala-event-hubs-consumer"
-
-version := "0.1"
-
-scalaVersion := "2.12.6"
-
-libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"
\ No newline at end of file
diff --git a/samples/kafka/akka-scala/consumer/resources/application.conf b/samples/kafka/akka-scala/consumer/resources/application.conf
deleted file mode 100644
index fc06c1af..00000000
--- a/samples/kafka/akka-scala/consumer/resources/application.conf
+++ /dev/null
@@ -1,16 +0,0 @@
-akka.kafka.consumer {
- #Akka Kafka consumer properties defined here
- wakeup-timeout=60s
-
- # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
- # defined in this configuration section.
- kafka-clients {
- request.timeout.ms=60000
- group.id=akka-example-consumer
-
- bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
- sasl.mechanism=PLAIN
- security.protocol=SASL_SSL
- sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
- }
-}
\ No newline at end of file
diff --git a/samples/kafka/akka-scala/consumer/scala/ConsumerMain.scala b/samples/kafka/akka-scala/consumer/scala/ConsumerMain.scala
deleted file mode 100644
index bd23b71e..00000000
--- a/samples/kafka/akka-scala/consumer/scala/ConsumerMain.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-import akka.actor.ActorSystem
-import akka.kafka.scaladsl.Consumer
-import akka.kafka.{ConsumerSettings, Subscriptions}
-import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.Sink
-import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
-
-import scala.concurrent.Future
-import scala.language.postfixOps
-
-object ConsumerMain {
-
- def main(args: Array[String]): Unit = {
- implicit val system:ActorSystem = ActorSystem.apply("akka-stream-kafka")
- implicit val materializer:ActorMaterializer = ActorMaterializer()
-
- // grab our settings from the resources/application.conf file
- val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
-
- // our topic to subscribe to for messages
- val topic = "test"
-
- // listen to our topic with our settings, until the program is exited
- Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
- .mapAsync(1) ( msg => {
- // print out our message once it's received
- println(s"Message Received : ${msg.timestamp} - ${msg.value}")
- Future.successful(msg)
- }).runWith(Sink.ignore)
- }
-}
\ No newline at end of file
diff --git a/samples/kafka/akka-scala/producer/build.sbt b/samples/kafka/akka-scala/producer/build.sbt
deleted file mode 100644
index 174bbc4f..00000000
--- a/samples/kafka/akka-scala/producer/build.sbt
+++ /dev/null
@@ -1,7 +0,0 @@
-name := "scala-event-hubs-producer"
-
-version := "0.1"
-
-scalaVersion := "2.12.6"
-
-libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"
\ No newline at end of file
diff --git a/samples/kafka/akka-scala/producer/resources/application.conf b/samples/kafka/akka-scala/producer/resources/application.conf
deleted file mode 100644
index 473f9247..00000000
--- a/samples/kafka/akka-scala/producer/resources/application.conf
+++ /dev/null
@@ -1,13 +0,0 @@
-akka.kafka.producer {
- #Akka kafka producer properties can be defined here
-
-
- # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
- # can be defined in this configuration section.
- kafka-clients {
- bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
- sasl.mechanism=PLAIN
- security.protocol=SASL_SSL
- sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
- }
-}
\ No newline at end of file
diff --git a/samples/kafka/akka-scala/producer/scala/ProducerMain.scala b/samples/kafka/akka-scala/producer/scala/ProducerMain.scala
deleted file mode 100644
index 770667b4..00000000
--- a/samples/kafka/akka-scala/producer/scala/ProducerMain.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-import java.util.concurrent.TimeUnit.SECONDS
-
-import akka.actor.ActorSystem
-import akka.kafka.ProducerSettings
-import akka.kafka.scaladsl.Producer
-import akka.stream.scaladsl.Source
-import akka.stream.{ActorMaterializer, ThrottleMode}
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
-
-import scala.concurrent.duration.FiniteDuration
-import scala.language.postfixOps
-
-object ProducerMain {
-
- def main(args: Array[String]): Unit = {
- implicit val system: ActorSystem = ActorSystem.apply("akka-stream-kafka")
- implicit val materializer: ActorMaterializer = ActorMaterializer()
-
- // grab our settings from the resources/application.conf file
- val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
-
- // topic to send the message to on event hubs
- val topic = "sampleTopic"
-
- // loop until the program reaches 100
- Source(1 to 100)
- .throttle(1, FiniteDuration(1, SECONDS), 1, ThrottleMode.Shaping)
- .map(num => {
- //construct our message here
- val message = s"Akka Scala Producer Message # ${num}"
- println(s"Message sent to topic - $topic - $message")
- new ProducerRecord[Array[Byte], String](topic, message.getBytes, message.toString)
- })
- .runWith(Producer.plainSink(producerSettings))
- .onComplete(_ => {
- println("All messages sent!")
- system.terminate()
- })(scala.concurrent.ExecutionContext.global)
- }
-}
\ No newline at end of file
diff --git a/samples/kafka/akka/README.md b/samples/kafka/akka/README.md
deleted file mode 100644
index 07d917ef..00000000
--- a/samples/kafka/akka/README.md
+++ /dev/null
@@ -1,6 +0,0 @@
-# We've moved!
-
-All Event Hubs for Kafka samples and documentation have been moved to https://github.com/Azure/azure-event-hubs-for-kafka. All other samples/docs relating to native (i.e. non-Kafka) Event Hubs SDKs and clients will still live in this repo.
-
-The Event Hubs for Kafka samples/docs in this repo (i.e. this directory and its subdirectories) are outdated and will be removed soon.
-
diff --git a/samples/kafka/akka/consumer/pom.xml b/samples/kafka/akka/consumer/pom.xml
deleted file mode 100644
index 78809c59..00000000
--- a/samples/kafka/akka/consumer/pom.xml
+++ /dev/null
@@ -1,57 +0,0 @@
-
-
- 4.0.0
- com.example.app
- event-hubs-kafka-akka-consumer
- jar
- 1.0-SNAPSHOT
-
- UTF-8
- UTF-8
-
-
-
- com.typesafe.akka
- akka-actor_2.12
- 2.5.16
-
-
- com.typesafe.akka
- akka-stream-kafka_2.12
- 0.20
-
-
- com.typesafe.akka
- akka-testkit_2.12
- 2.5.16
-
-
- junit
- junit
- 4.12
-
-
-
- install
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.6.1
-
-
- 1.8
-
-
-
- org.apache.maven.plugins
- maven-resources-plugin
- 3.0.2
-
- UTF-8
-
-
-
-
-
diff --git a/samples/kafka/akka/consumer/src/main/java/com/example/app/AkkaTestConsumer.java b/samples/kafka/akka/consumer/src/main/java/com/example/app/AkkaTestConsumer.java
deleted file mode 100644
index 7d5dfd4d..00000000
--- a/samples/kafka/akka/consumer/src/main/java/com/example/app/AkkaTestConsumer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-import akka.Done;
-import akka.actor.ActorSystem;
-import akka.kafka.*;
-import akka.stream.ActorMaterializer;
-import akka.stream.Materializer;
-import akka.stream.javadsl.*;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.atomic.AtomicLong;
-
-abstract class AbstractConsumer {
- protected final ActorSystem system = ActorSystem.create("example");
-
- protected final Materializer materializer = ActorMaterializer.create(system);
-
- // Consumer settings
- protected ConsumerSettings consumerSettings = ConsumerSettings
- .create(system, new ByteArrayDeserializer(), new StringDeserializer());
-
- // DB
- static class DB {
- private final AtomicLong offset = new AtomicLong();
-
- public CompletionStage save(ConsumerRecord record) {
- System.out.println("DB.save: " + record.value());
- offset.set(record.offset());
- return CompletableFuture.completedFuture(Done.getInstance());
- }
-
- public CompletionStage loadOffset() {
- return CompletableFuture.completedFuture(offset.get());
- }
-
- public CompletionStage update(String data) {
- System.out.println(data);
- return CompletableFuture.completedFuture(Done.getInstance());
- }
- }
-}
-
-
-public class AkkaTestConsumer extends AbstractConsumer {
-
- private final static String TOPIC = "test";
-
- public static void main(String[] args) {
- new AkkaTestConsumer().demo();
- }
-
- //Consumes each message from TOPIC at least once
- public void demo() {
- final DB db = new DB();
- akka.kafka.javadsl.Consumer.committableSource(consumerSettings, Subscriptions.topics(TOPIC))
- .mapAsync(1, msg -> db.update(msg.record().value()).thenApply(done -> msg))
- .mapAsync(1, msg -> msg.committableOffset().commitJavadsl())
- .runWith(Sink.foreach(p -> System.out.println(p)), materializer);
- }
-}
diff --git a/samples/kafka/akka/consumer/src/main/resources/application.conf b/samples/kafka/akka/consumer/src/main/resources/application.conf
deleted file mode 100644
index 30256678..00000000
--- a/samples/kafka/akka/consumer/src/main/resources/application.conf
+++ /dev/null
@@ -1,16 +0,0 @@
-akka.kafka.consumer {
- #Akka Kafka consumer properties defined here
- wakeup-timeout=60s
-
- # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
- # defined in this configuration section.
- kafka-clients {
- request.timeout.ms=60000
- group.id=akka-example-consumer
-
- bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
- sasl.mechanism=PLAIN
- security.protocol=SASL_SSL
- sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
- }
-}
diff --git a/samples/kafka/akka/producer/pom.xml b/samples/kafka/akka/producer/pom.xml
deleted file mode 100644
index 55219297..00000000
--- a/samples/kafka/akka/producer/pom.xml
+++ /dev/null
@@ -1,57 +0,0 @@
-
-
- 4.0.0
- com.example.app
- event-hubs-kafka-akka-producer
- jar
- 1.0-SNAPSHOT
-
- UTF-8
- UTF-8
-
-
-
- com.typesafe.akka
- akka-actor_2.12
- 2.5.16
-
-
- com.typesafe.akka
- akka-stream-kafka_2.12
- 0.20
-
-
- com.typesafe.akka
- akka-testkit_2.12
- 2.5.16
-
-
- junit
- junit
- 4.12
-
-
-
- install
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.6.1
-
-
- 1.8
-
-
-
- org.apache.maven.plugins
- maven-resources-plugin
- 3.0.2
-
- UTF-8
-
-
-
-
-
diff --git a/samples/kafka/akka/producer/src/main/java/com/example/app/AkkaTestProducer.java b/samples/kafka/akka/producer/src/main/java/com/example/app/AkkaTestProducer.java
deleted file mode 100644
index 6e6b1b2f..00000000
--- a/samples/kafka/akka/producer/src/main/java/com/example/app/AkkaTestProducer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-import akka.Done;
-import akka.actor.ActorSystem;
-import akka.kafka.ProducerSettings;
-import akka.kafka.javadsl.Producer;
-import akka.stream.ActorMaterializer;
-import akka.stream.Materializer;
-import akka.stream.javadsl.Source;
-import akka.stream.StreamLimitReachedException;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import java.util.concurrent.CompletionStage;
-
-abstract class AbstractProducer {
- protected final ActorSystem system = ActorSystem.create("example");
-
- protected final Materializer materializer = ActorMaterializer.create(system);
-
- protected final ProducerSettings producerSettings = ProducerSettings
- .create(system, new ByteArraySerializer(), new StringSerializer());
-
- protected final KafkaProducer kafkaProducer = producerSettings.createKafkaProducer();
-
- protected final static int RANGE = 100;
-
- protected void terminateWhenDone(CompletionStage result) {
- result.exceptionally(e -> {
- if (e instanceof StreamLimitReachedException){
- System.out.println("Sent " + RANGE + " messages!");
- system.terminate();
- }
- else
- system.log().error(e, e.getMessage());
- return Done.getInstance();
- })
- .thenAccept(d -> system.terminate());
- }
-}
-
-public class AkkaTestProducer extends AbstractProducer {
-
- private static final String TOPIC = "test";
-
- public static void main(String[] args) {
- new AkkaTestProducer().demo();
- }
-
- public void demo() {
- System.out.println("Sending");
- // Sends integer 1-100 to Kafka topic "test"
- CompletionStage done = Source
- .range(1, RANGE)
- .limit(RANGE - 1)
- .map(n -> n.toString()).map(elem -> new ProducerRecord(TOPIC, elem))
- .runWith(Producer.plainSink(producerSettings, kafkaProducer), materializer);
- terminateWhenDone(done);
- }
-}
diff --git a/samples/kafka/akka/producer/src/main/resources/application.conf b/samples/kafka/akka/producer/src/main/resources/application.conf
deleted file mode 100644
index 17de5436..00000000
--- a/samples/kafka/akka/producer/src/main/resources/application.conf
+++ /dev/null
@@ -1,13 +0,0 @@
-akka.kafka.producer {
- #Akka kafka producer properties can be defined here
-
-
- # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
- # can be defined in this configuration section.
- kafka-clients {
- bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
- sasl.mechanism=PLAIN
- security.protocol=SASL_SSL
- sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
- }
-}
diff --git a/samples/kafka/flink/README.md b/samples/kafka/flink/README.md
deleted file mode 100644
index 07d917ef..00000000
--- a/samples/kafka/flink/README.md
+++ /dev/null
@@ -1,6 +0,0 @@
-# We've moved!
-
-All Event Hubs for Kafka samples and documentation have been moved to https://github.com/Azure/azure-event-hubs-for-kafka. All other samples/docs relating to native (i.e. non-Kafka) Event Hubs SDKs and clients will still live in this repo.
-
-The Event Hubs for Kafka samples/docs in this repo (i.e. this directory and its subdirectories) are outdated and will be removed soon.
-
diff --git a/samples/kafka/flink/consumer/pom.xml b/samples/kafka/flink/consumer/pom.xml
deleted file mode 100644
index 7e6ede2f..00000000
--- a/samples/kafka/flink/consumer/pom.xml
+++ /dev/null
@@ -1,52 +0,0 @@
-
-
- 4.0.0
- com.example.app
- event-hubs-kafka-flink-consumer
- jar
- 1.0-SNAPSHOT
-
- UTF-8
- UTF-8
-
-
-
- org.apache.flink
- flink-connector-kafka-0.11_2.11
- 1.4.2
-
-
- org.apache.flink
- flink-java
- 1.4.2
-
-
- org.apache.flink
- flink-streaming-java_2.11
- 1.4.2
-
-
-
- install
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.6.1
-
-
- 1.7
-
-
-
- org.apache.maven.plugins
- maven-resources-plugin
- 3.0.2
-
- UTF-8
-
-
-
-
-
diff --git a/samples/kafka/flink/consumer/src/main/java/com/example/app/FlinkTestConsumer.java b/samples/kafka/flink/consumer/src/main/java/com/example/app/FlinkTestConsumer.java
deleted file mode 100644
index 346487ef..00000000
--- a/samples/kafka/flink/consumer/src/main/java/com/example/app/FlinkTestConsumer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; //Kafka v0.11.0.0
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.util.Properties;
-
-public class FlinkTestConsumer {
-
- private static final String TOPIC = "test";
- private static final String FILE_PATH = "src/main/resources/consumer.config";
-
- public static void main(String... args) {
- try {
- //Load properties from config file
- Properties properties = new Properties();
- properties.load(new FileReader(FILE_PATH));
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream stream = env.addSource(new FlinkKafkaConsumer011(TOPIC, new SimpleStringSchema(), properties));
- stream.print();
- env.execute("Testing flink consumer");
-
- } catch(FileNotFoundException e){
- System.out.println("FileNoteFoundException: " + e);
- } catch (Exception e){
- System.out.println("Failed with exception " + e);
- }
- }
-}
diff --git a/samples/kafka/flink/consumer/src/main/resources/consumer.config b/samples/kafka/flink/consumer/src/main/resources/consumer.config
deleted file mode 100644
index cd2518c6..00000000
--- a/samples/kafka/flink/consumer/src/main/resources/consumer.config
+++ /dev/null
@@ -1,7 +0,0 @@
-bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
-group.id=FlinkExampleConsumer
-request.timeout.ms=60000
-
-sasl.mechanism=PLAIN
-security.protocol=SASL_SSL
-sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
diff --git a/samples/kafka/flink/producer/pom.xml b/samples/kafka/flink/producer/pom.xml
deleted file mode 100644
index 1668bf3f..00000000
--- a/samples/kafka/flink/producer/pom.xml
+++ /dev/null
@@ -1,52 +0,0 @@
-
-
- 4.0.0
- com.example.app
- event-hubs-kafka-flink-producer
- jar
- 1.0-SNAPSHOT
-
- UTF-8
- UTF-8
-
-
-
- org.apache.flink
- flink-connector-kafka-0.11_2.11
- 1.4.2
-
-
- org.apache.flink
- flink-java
- 1.4.2
-
-
- org.apache.flink
- flink-streaming-java_2.11
- 1.4.2
-
-
-
- install
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.6.1
-
-
- 1.7
-
-
-
- org.apache.maven.plugins
- maven-resources-plugin
- 3.0.2
-
- UTF-8
-
-
-
-
-
diff --git a/samples/kafka/flink/producer/src/main/java/com/example/app/FlinkTestProducer.java b/samples/kafka/flink/producer/src/main/java/com/example/app/FlinkTestProducer.java
deleted file mode 100644
index 45c3a24f..00000000
--- a/samples/kafka/flink/producer/src/main/java/com/example/app/FlinkTestProducer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; //v0.11.0.0
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.util.Properties;
-
-public class FlinkTestProducer {
-
- private static final String TOPIC = "test";
- private static final String FILE_PATH = "src/main/resources/producer.config";
-
- public static void main(String... args) {
- try {
- Properties properties = new Properties();
- properties.load(new FileReader(FILE_PATH));
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream stream = createStream(env);
- FlinkKafkaProducer011 myProducer = new FlinkKafkaProducer011<>(
- TOPIC,
- new SimpleStringSchema(), // serialization schema
- properties);
-
- stream.addSink(myProducer);
- env.execute("Testing flink print");
-
- } catch(FileNotFoundException e){
- System.out.println("FileNotFoundException: " + e);
- } catch (Exception e) {
- System.out.println("Failed with exception:: " + e);
- }
- }
-
- public static DataStream createStream(StreamExecutionEnvironment env){
- return env.generateSequence(0, 200)
- .map(new MapFunction() {
- @Override
- public String map(Long in) {
- return "FLINK PRODUCE " + in;
- }
- });
- }
-}
diff --git a/samples/kafka/flink/producer/src/main/resources/producer.config b/samples/kafka/flink/producer/src/main/resources/producer.config
deleted file mode 100644
index eb9c2a9d..00000000
--- a/samples/kafka/flink/producer/src/main/resources/producer.config
+++ /dev/null
@@ -1,6 +0,0 @@
-bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
-client.id=FlinkExampleProducer
-
-sasl.mechanism=PLAIN
-security.protocol=SASL_SSL
-sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
diff --git a/samples/kafka/mirror-maker/README.md b/samples/kafka/mirror-maker/README.md
deleted file mode 100644
index 07d917ef..00000000
--- a/samples/kafka/mirror-maker/README.md
+++ /dev/null
@@ -1,6 +0,0 @@
-# We've moved!
-
-All Event Hubs for Kafka samples and documentation have been moved to https://github.com/Azure/azure-event-hubs-for-kafka. All other samples/docs relating to native (i.e. non-Kafka) Event Hubs SDKs and clients will still live in this repo.
-
-The Event Hubs for Kafka samples/docs in this repo (i.e. this directory and its subdirectories) are outdated and will be removed soon.
-
diff --git a/samples/kafka/mirror-maker/mirror-eventhub.config b/samples/kafka/mirror-maker/mirror-eventhub.config
deleted file mode 100644
index 21edeb27..00000000
--- a/samples/kafka/mirror-maker/mirror-eventhub.config
+++ /dev/null
@@ -1,8 +0,0 @@
-# Event Hubs Kafka endpoint
-bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
-client.id=mirror_maker_producer
-
-# Event Hubs requires secure communication
-sasl.mechanism=PLAIN
-security.protocol=SASL_SSL
-sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
\ No newline at end of file
diff --git a/samples/kafka/mirror-maker/mirror-maker-graphic.PNG b/samples/kafka/mirror-maker/mirror-maker-graphic.PNG
deleted file mode 100644
index de3a6f38..00000000
Binary files a/samples/kafka/mirror-maker/mirror-maker-graphic.PNG and /dev/null differ
diff --git a/samples/kafka/mirror-maker/source-kafka.config b/samples/kafka/mirror-maker/source-kafka.config
deleted file mode 100644
index f7231393..00000000
--- a/samples/kafka/mirror-maker/source-kafka.config
+++ /dev/null
@@ -1,5 +0,0 @@
-#Simple Kafka set up
-bootstrap.servers={SOURCE.KAFKA.IP.ADDRESS1}:{SOURCE.KAFKA.PORT1},{SOURCE.KAFKA.IP.ADDRESS2}:{SOURCE.KAFKA.PORT2},etc
-client.id=mirror_maker_consumer
-group.id=example-mirrormaker-group
-exclude.internal.topics=true
\ No newline at end of file
diff --git a/samples/kafka/node/README.md b/samples/kafka/node/README.md
deleted file mode 100644
index 07d917ef..00000000
--- a/samples/kafka/node/README.md
+++ /dev/null
@@ -1,6 +0,0 @@
-# We've moved!
-
-All Event Hubs for Kafka samples and documentation have been moved to https://github.com/Azure/azure-event-hubs-for-kafka. All other samples/docs relating to native (i.e. non-Kafka) Event Hubs SDKs and clients will still live in this repo.
-
-The Event Hubs for Kafka samples/docs in this repo (i.e. this directory and its subdirectories) are outdated and will be removed soon.
-
diff --git a/samples/kafka/node/consumer.js b/samples/kafka/node/consumer.js
deleted file mode 100644
index a5cc43f6..00000000
--- a/samples/kafka/node/consumer.js
+++ /dev/null
@@ -1,35 +0,0 @@
-var Transform = require('stream').Transform;
-var Kafka = require('node-rdkafka');
-
-var stream = Kafka.KafkaConsumer.createReadStream({
- 'metadata.broker.list': 'EVENTHUB_FQDN',
- 'group.id': 'EVENTHUB_CONSUMER_GROUP', //The default for EventHubs is $Default
- 'socket.keepalive.enable': true,
- 'enable.auto.commit': false,
- 'security.protocol': 'SASL_SSL',
- 'sasl.mechanisms': 'PLAIN',
- 'sasl.username': '$ConnectionString',
- 'sasl.password': 'EVENTHUB_CONNECTION_STRING'
-
-}, {}, {
- topics: 'test',
- waitInterval: 0,
- objectMode: false
- });
-
-stream.on('error', function (err) {
- if (err) console.log(err);
- process.exit(1);
-});
-
-stream
- .pipe(process.stdout);
-
-stream.on('error', function (err) {
- console.log(err);
- process.exit(1);
-});
-
-stream.consumer.on('event.error', function (err) {
- console.log(err);
-})
\ No newline at end of file
diff --git a/samples/kafka/node/package-lock.json b/samples/kafka/node/package-lock.json
deleted file mode 100644
index 4c47f69c..00000000
--- a/samples/kafka/node/package-lock.json
+++ /dev/null
@@ -1,27 +0,0 @@
-{
- "name": "kafka_node",
- "version": "1.0.0",
- "lockfileVersion": 1,
- "requires": true,
- "dependencies": {
- "bindings": {
- "version": "1.3.0",
- "resolved": "https://registry.npmjs.org/bindings/-/bindings-1.3.0.tgz",
- "integrity": "sha512-DpLh5EzMR2kzvX1KIlVC0VkC3iZtHKTgdtZ0a3pglBZdaQFjt5S9g9xd1lE+YvXyfd6mtCeRnrUfOLYiTMlNSw=="
- },
- "nan": {
- "version": "2.10.0",
- "resolved": "https://registry.npmjs.org/nan/-/nan-2.10.0.tgz",
- "integrity": "sha512-bAdJv7fBLhWC+/Bls0Oza+mvTaNQtP+1RyhhhvD95pgUJz6XM5IzgmxOkItJ9tkoCiplvAnXI1tNmmUD/eScyA=="
- },
- "node-rdkafka": {
- "version": "2.3.3",
- "resolved": "https://registry.npmjs.org/node-rdkafka/-/node-rdkafka-2.3.3.tgz",
- "integrity": "sha512-2J54zC9+Zj0iRQttmQs1Ubv8aHhmh04XjP3vk39uco7l6tp8BYYHG4XRsoqKOGGKjBLctGpFHr9g97WBE1pTbg==",
- "requires": {
- "bindings": "1.3.0",
- "nan": "2.10.0"
- }
- }
- }
-}
diff --git a/samples/kafka/node/package.json b/samples/kafka/node/package.json
deleted file mode 100644
index 9feabaa6..00000000
--- a/samples/kafka/node/package.json
+++ /dev/null
@@ -1,15 +0,0 @@
-{
- "name": "kafka_node",
- "version": "1.0.0",
- "description": "",
- "main": "index.js",
- "scripts": {
- "test": "echo \"Error: no test specified\" && exit 1"
- },
- "keywords": [],
- "author": "",
- "license": "ISC",
- "dependencies": {
- "node-rdkafka": "^2.3.3"
- }
-}
diff --git a/samples/kafka/node/producer.js b/samples/kafka/node/producer.js
deleted file mode 100644
index faa3c87f..00000000
--- a/samples/kafka/node/producer.js
+++ /dev/null
@@ -1,63 +0,0 @@
-var Kafka = require('node-rdkafka');
-
-var producer = new Kafka.Producer({
- //'debug' : 'all',
- 'metadata.broker.list': 'EVENTHUB_FQDN',
- 'dr_cb': true, //delivery report callback
- 'security.protocol': 'SASL_SSL',
- 'sasl.mechanisms': 'PLAIN',
- 'sasl.username': '$ConnectionString',
- 'sasl.password': 'EVENTHUB_CONNECTION_STRING'
-});
-
-var topicName = 'test';
-
-//logging debug messages, if debug is enabled
-producer.on('event.log', function(log) {
- console.log(log);
-});
-
-//logging all errors
-producer.on('event.error', function(err) {
- console.error('Error from producer');
- console.error(err);
-});
-
-//counter to stop this sample after maxMessages are sent
-var counter = 0;
-var maxMessages = 10;
-
-producer.on('delivery-report', function(err, report) {
- console.log('delivery-report: ' + JSON.stringify(report));
- counter++;
-});
-
-//Wait for the ready event before producing
-producer.on('ready', function(arg) {
- console.log('producer ready.' + JSON.stringify(arg));
-
- for (var i = 0; i < maxMessages; i++) {
- var value = new Buffer(`{"name" : "person${i}"}"`);
- var key = "key-"+i;
- // if partition is set to -1, librdkafka will use the default partitioner
- var partition = -1;
- producer.produce(topicName, partition, value, key);
- }
-
- //need to keep polling for a while to ensure the delivery reports are received
- var pollLoop = setInterval(function() {
- producer.poll();
- if (counter === maxMessages) {
- clearInterval(pollLoop);
- producer.disconnect();
- }
- }, 1000);
-
-});
-
-producer.on('disconnected', function(arg) {
- console.log('producer disconnected. ' + JSON.stringify(arg));
-});
-
-//starting the producer
-producer.connect();
diff --git a/samples/kafka/python/README.md b/samples/kafka/python/README.md
deleted file mode 100644
index 07d917ef..00000000
--- a/samples/kafka/python/README.md
+++ /dev/null
@@ -1,6 +0,0 @@
-# We've moved!
-
-All Event Hubs for Kafka samples and documentation have been moved to https://github.com/Azure/azure-event-hubs-for-kafka. All other samples/docs relating to native (i.e. non-Kafka) Event Hubs SDKs and clients will still live in this repo.
-
-The Event Hubs for Kafka samples/docs in this repo (i.e. this directory and its subdirectories) are outdated and will be removed soon.
-
diff --git a/samples/kafka/python/consumer.py b/samples/kafka/python/consumer.py
deleted file mode 100644
index 7ecb5310..00000000
--- a/samples/kafka/python/consumer.py
+++ /dev/null
@@ -1,126 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright 2016 Confluent Inc.
-#
-# Modified for use with Azure Event Hubs for Apache Kafka Ecosystems
-# 06/18/2018
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#
-# Example high-level Kafka 0.9 balanced Consumer
-#
-from confluent_kafka import Consumer, KafkaException, KafkaError
-import sys
-import getopt
-import json
-import logging
-from pprint import pformat
-
-
-def stats_cb(stats_json_str):
- stats_json = json.loads(stats_json_str)
- print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))
-
-
-def print_usage_and_exit(program_name):
- sys.stderr.write('Usage: %s [options..] ..\n' % program_name)
- options = '''
- Options:
- -T Enable client statistics at specified interval (ms)
-'''
- sys.stderr.write(options)
- sys.exit(1)
-
-
-if __name__ == '__main__':
- optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
- if len(argv) < 2:
- print_usage_and_exit(sys.argv[0])
-
- group = argv[0]
- topics = argv[1:]
- # Consumer configuration
- # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
- conf = {
- 'bootstrap.servers': '{YOUR.EVENTHUB.FQDN}:9093',
- 'security.protocol' : 'SASL_SSL',
- 'sasl.mechanism' : 'PLAIN',
- 'sasl.username' : '$ConnectionString',
- 'sasl.password' : '{YOUR.EVENTHUB.CONNECTION.STRING}',
- 'group.id': group,
- 'client.id': 'python-example-consumer',
- 'request.timeout.ms': 60000,
- 'default.topic.config': {'auto.offset.reset': 'smallest'}
- }
-
- # Check to see if -T option exists
- for opt in optlist:
- if opt[0] != '-T':
- continue
- try:
- intval = int(opt[1])
- except ValueError:
- sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
- sys.exit(1)
-
- if intval <= 0:
- sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
- sys.exit(1)
-
- conf['stats_cb'] = stats_cb
- conf['statistics.interval.ms'] = int(opt[1])
-
- # Create logger for consumer (logs will be emitted when poll() is called)
- logger = logging.getLogger('consumer')
- logger.setLevel(logging.DEBUG)
- handler = logging.StreamHandler()
- handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
- logger.addHandler(handler)
-
- # Create Consumer instance
- # Hint: try debug='fetch' to generate some log messages
- c = Consumer(conf, logger=logger)
-
- def print_assignment(consumer, partitions):
- print('Assignment:', partitions)
-
- # Subscribe to topics
- c.subscribe(topics, on_assign=print_assignment)
-
- # Read messages from Kafka, print to stdout
- try:
- while True:
- msg = c.poll(timeout=100.0)
- if msg is None:
- continue
- if msg.error():
- # Error or event
- if msg.error().code() == KafkaError._PARTITION_EOF:
- # End of partition event
- sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
- (msg.topic(), msg.partition(), msg.offset()))
- else:
- # Error
- raise KafkaException(msg.error())
- else:
- # Proper message
- print(msg.value())
-
- except KeyboardInterrupt:
- sys.stderr.write('%% Aborted by user\n')
-
- finally:
- # Close down consumer to commit final offsets.
- c.close()
diff --git a/samples/kafka/python/producer.py b/samples/kafka/python/producer.py
deleted file mode 100644
index 046bc313..00000000
--- a/samples/kafka/python/producer.py
+++ /dev/null
@@ -1,66 +0,0 @@
-#!/usr/bin/env python
-#
-#Copyright 2016 Confluent Inc.
-#
-# Modified for use with Azure Event Hubs for Apache Kafka Ecosystems
-# 06/18/2018
-#
-#Licensed under the Apache License, Version 2.0 (the "License");
-#you may not use this file except in compliance with the License.
-#You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-#Unless required by applicable law or agreed to in writing, software
-#distributed under the License is distributed on an "AS IS" BASIS,
-#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#See the License for the specific language governing permissions and
-#limitations under the License.
-#
-
-#
-#Example Kafka Producer.
-#
-from confluent_kafka import Producer
-import sys
-
-if __name__ == '__main__':
- if len(sys.argv) != 2:
- sys.stderr.write('Usage: %s \n' % sys.argv[0])
- sys.exit(1)
- topic = sys.argv[1]
-
- #Producer configuration
- #See https: //github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
- #See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka#prerequisites for SSL issues
- conf = {
- 'bootstrap.servers': '{YOUR.EVENTHUB.FQDN}:9093',
- 'security.protocol': 'SASL_SSL',
- 'sasl.mechanism': 'PLAIN',
- 'sasl.username': '$ConnectionString',
- 'sasl.password': '{YOUR.EVENTHUB.CONNECTION.STRING}',
- 'client.id': 'python-example-producer'
- }
-
- #Create Producer instance
- p = Producer (**conf)
-
- def delivery_callback (err, msg):
- if err:
- sys.stderr.write ('%% Message failed delivery: %s\n' % err)
- else:
- sys.stderr.write ('%% Message delivered to %s [%d] @ %o\n' % (msg.topic (), msg.partition (), msg.offset ()))
-
-
- #Write 1-100 to topic
- for i in range(0,100):
- try:
- p.produce(topic, str(i), callback=delivery_callback)
- except BufferError as e:
- sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len (p))
- p.poll (0)
-
- #Wait until all messages have been delivered
- sys.stderr.write ('%% Waiting for %d deliveries\n' % len (p))
- p.flush ()
-
diff --git a/samples/kafka/python/setup.sh b/samples/kafka/python/setup.sh
deleted file mode 100644
index 601d202a..00000000
--- a/samples/kafka/python/setup.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/bin/bash
-echo "Downloading/installing necessary libraries and repos"
-sudo apt-get install git openssl libssl-dev build-essential python-pip python-dev librdkafka-dev
-git clone https://github.com/confluentinc/confluent-kafka-python
-git clone https://github.com/edenhill/librdkafka
-
-echo "Setting up librdkafka"
-cd librdkafka
-./configure
-make
-sudo make install
-cd ..
-
-echo "Setting up Confluent's Python Kafka library"
-#The "/" at the end of confluent-kafka-python is important
-#If there's no "/" pip will try to download a confluent-kafka-python package and fail to find it
-sudo pip install confluent-kafka-python/
-echo "Try running the samples now!"
-
-#Sometimes 'sudo apt-get purge librdkafka1' helps if this script doesn't work initially
diff --git a/samples/kafka/quickstart/README.md b/samples/kafka/quickstart/README.md
deleted file mode 100644
index 07d917ef..00000000
--- a/samples/kafka/quickstart/README.md
+++ /dev/null
@@ -1,6 +0,0 @@
-# We've moved!
-
-All Event Hubs for Kafka samples and documentation have been moved to https://github.com/Azure/azure-event-hubs-for-kafka. All other samples/docs relating to native (i.e. non-Kafka) Event Hubs SDKs and clients will still live in this repo.
-
-The Event Hubs for Kafka samples/docs in this repo (i.e. this directory and its subdirectories) are outdated and will be removed soon.
-
diff --git a/samples/kafka/quickstart/consumer/pom.xml b/samples/kafka/quickstart/consumer/pom.xml
deleted file mode 100644
index e141592c..00000000
--- a/samples/kafka/quickstart/consumer/pom.xml
+++ /dev/null
@@ -1,53 +0,0 @@
-
-
-
- 4.0.0
-
- com.example.app
- event-hubs-kafka-java-consumer
- jar
- 1.0-SNAPSHOT
-
-
- UTF-8
- UTF-8
-
-
-
-
- org.apache.kafka
- kafka-clients
- 1.0.0
-
-
-
-
-
- install
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.6.1
-
-
- 1.7
-
-
-
- org.apache.maven.plugins
- maven-resources-plugin
- 3.0.2
-
- UTF-8
-
-
-
-
-
-
diff --git a/samples/kafka/quickstart/consumer/src/main/java/com/example/app/TestConsumer.java b/samples/kafka/quickstart/consumer/src/main/java/com/example/app/TestConsumer.java
deleted file mode 100644
index 2989d359..00000000
--- a/samples/kafka/quickstart/consumer/src/main/java/com/example/app/TestConsumer.java
+++ /dev/null
@@ -1,18 +0,0 @@
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-public class TestConsumer {
- //Change constant to send messages to the desired topic
- private final static String TOPIC = "test";
-
- private final static int NUM_THREADS = 1;
-
- public static void main(String... args) throws Exception {
-
- final ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS);
-
- for (int i = 0; i < NUM_THREADS; i++){
- executorService.execute(new TestConsumerThread(TOPIC));
- }
- }
-}
diff --git a/samples/kafka/quickstart/consumer/src/main/java/com/example/app/TestConsumerThread.java b/samples/kafka/quickstart/consumer/src/main/java/com/example/app/TestConsumerThread.java
deleted file mode 100644
index d0b2dcef..00000000
--- a/samples/kafka/quickstart/consumer/src/main/java/com/example/app/TestConsumerThread.java
+++ /dev/null
@@ -1,70 +0,0 @@
-import org.apache.kafka.clients.consumer.*;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import java.io.FileReader;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Properties;
-
-public class TestConsumerThread implements Runnable {
-
- private final String TOPIC;
-
- //Each consumer needs a unique client ID per thread
- private static int id = 0;
-
- public TestConsumerThread(final String TOPIC){
- this.TOPIC = TOPIC;
- }
-
- public void run (){
- final Consumer consumer = createConsumer();
- System.out.println("Polling");
-
- try {
- while (true) {
- final ConsumerRecords consumerRecords = consumer.poll(1000);
- for(ConsumerRecord cr : consumerRecords) {
- System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", cr.key(), cr.value(), cr.partition(), cr.offset());
- }
- consumer.commitAsync();
- }
- } catch (CommitFailedException e) {
- System.out.println("CommitFailedException: " + e);
- } finally {
- consumer.close();
- }
- }
-
- private Consumer createConsumer() {
- try {
- final Properties properties = new Properties();
- synchronized (TestConsumerThread.class) {
- properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "KafkaExampleConsumer#" + id);
- id++;
- }
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
- //Get remaining properties from config file
- properties.load(new FileReader("src/main/resources/consumer.config"));
-
- // Create the consumer using properties.
- final Consumer consumer = new KafkaConsumer<>(properties);
-
- // Subscribe to the topic.
- consumer.subscribe(Collections.singletonList(TOPIC));
- return consumer;
-
- } catch (FileNotFoundException e){
- System.out.println("FileNoteFoundException: " + e);
- System.exit(1);
- return null; //unreachable
- } catch (IOException e){
- System.out.println("IOException: " + e);
- System.exit(1);
- return null; //unreachable
- }
- }
-}
diff --git a/samples/kafka/quickstart/consumer/src/main/resources/consumer.config b/samples/kafka/quickstart/consumer/src/main/resources/consumer.config
deleted file mode 100644
index 773f35f8..00000000
--- a/samples/kafka/quickstart/consumer/src/main/resources/consumer.config
+++ /dev/null
@@ -1,6 +0,0 @@
-bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093
-group.id=$Default
-request.timeout.ms=60000
-security.protocol=SASL_SSL
-sasl.mechanism=PLAIN
-sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
diff --git a/samples/kafka/quickstart/producer/pom.xml b/samples/kafka/quickstart/producer/pom.xml
deleted file mode 100644
index 2693e536..00000000
--- a/samples/kafka/quickstart/producer/pom.xml
+++ /dev/null
@@ -1,53 +0,0 @@
-
-
-
- 4.0.0
-
- com.example.app
- event-hubs-kafka-java-producer
- jar
- 1.0-SNAPSHOT
-
-
- UTF-8
- UTF-8
-
-
-
-
- org.apache.kafka
- kafka-clients
- 1.0.0
-
-
-
-
-
- install
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.6.1
-
-
- 1.7
-
-
-
- org.apache.maven.plugins
- maven-resources-plugin
- 3.0.2
-
- UTF-8
-
-
-
-
-
-
diff --git a/samples/kafka/quickstart/producer/src/main/java/com/example/app/TestDataReporter.java b/samples/kafka/quickstart/producer/src/main/java/com/example/app/TestDataReporter.java
deleted file mode 100644
index e12efaee..00000000
--- a/samples/kafka/quickstart/producer/src/main/java/com/example/app/TestDataReporter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import java.sql.Timestamp;
-
-public class TestDataReporter implements Runnable {
-
- private static final int NUM_MESSAGES = 100;
- private final String TOPIC;
-
- private Producer producer;
-
- public TestDataReporter(final Producer producer, String TOPIC) {
- this.producer = producer;
- this.TOPIC = TOPIC;
- }
-
- @Override
- public void run() {
- for(int i = 0; i < NUM_MESSAGES; i++) {
- long time = System.currentTimeMillis();
- System.out.println("Test Data #" + i + " from thread #" + Thread.currentThread().getId());
-
- final ProducerRecord record = new ProducerRecord(TOPIC, time, "Test Data #" + i);
- producer.send(record, new Callback() {
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception != null) {
- System.out.println(exception);
- System.exit(1);
- }
- }
- });
- }
- System.out.println("Finished sending " + NUM_MESSAGES + " messages from thread #" + Thread.currentThread().getId() + "!");
- }
-}
diff --git a/samples/kafka/quickstart/producer/src/main/java/com/example/app/TestProducer.java b/samples/kafka/quickstart/producer/src/main/java/com/example/app/TestProducer.java
deleted file mode 100644
index 5ebe5ba3..00000000
--- a/samples/kafka/quickstart/producer/src/main/java/com/example/app/TestProducer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import java.util.Properties;
-import java.io.FileReader;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-
-public class TestProducer {
- //Change constant to send messages to the desired topic, for this example we use 'test'
- private final static String TOPIC = "test";
-
- private final static int NUM_THREADS = 1;
-
-
- public static void main(String... args) throws Exception {
- //Create Kafka Producer
- final Producer producer = createProducer();
-
- final ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS);
-
- //Run NUM_THREADS TestDataReporters
- for (int i = 0; i < NUM_THREADS; i++)
- executorService.execute(new TestDataReporter(producer, TOPIC));
- }
-
- private static Producer createProducer() {
- try{
- Properties properties = new Properties();
- properties.load(new FileReader("src/main/resources/producer.config"));
- properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- return new KafkaProducer<>(properties);
- } catch (Exception e){
- System.out.println("Failed to create producer with exception: " + e);
- System.exit(0);
- return null; //unreachable
- }
- }
-}
-
-
diff --git a/samples/kafka/quickstart/producer/src/main/resources/producer.config b/samples/kafka/quickstart/producer/src/main/resources/producer.config
deleted file mode 100644
index cabaca25..00000000
--- a/samples/kafka/quickstart/producer/src/main/resources/producer.config
+++ /dev/null
@@ -1,4 +0,0 @@
-bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093
-security.protocol=SASL_SSL
-sasl.mechanism=PLAIN
-sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";