Removing outdated Kafka samples. Now in new repo

This commit is contained in:
Basil Hariri 2019-01-03 15:38:02 -08:00
Родитель 78b0d59514
Коммит 55d5d940e5
43 изменённых файлов: 0 добавлений и 1266 удалений

Просмотреть файл

@ -1,6 +0,0 @@
# We've moved!
All Event Hubs for Kafka samples and documentation have been moved to https://github.com/Azure/azure-event-hubs-for-kafka. All other samples/docs relating to native (i.e. non-Kafka) Event Hubs SDKs and clients will still live in this repo.
The Event Hubs for Kafka samples/docs in this repo (i.e. this directory and its subdirectories) are outdated and will be removed soon.

Просмотреть файл

@ -1,7 +0,0 @@
name := "scala-event-hubs-consumer"
version := "0.1"
scalaVersion := "2.12.6"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"

Просмотреть файл

@ -1,16 +0,0 @@
akka.kafka.consumer {
#Akka Kafka consumer properties defined here
wakeup-timeout=60s
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# defined in this configuration section.
kafka-clients {
request.timeout.ms=60000
group.id=akka-example-consumer
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}

Просмотреть файл

@ -1,31 +0,0 @@
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.concurrent.Future
import scala.language.postfixOps
object ConsumerMain {
def main(args: Array[String]): Unit = {
implicit val system:ActorSystem = ActorSystem.apply("akka-stream-kafka")
implicit val materializer:ActorMaterializer = ActorMaterializer()
// grab our settings from the resources/application.conf file
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
// our topic to subscribe to for messages
val topic = "test"
// listen to our topic with our settings, until the program is exited
Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(1) ( msg => {
// print out our message once it's received
println(s"Message Received : ${msg.timestamp} - ${msg.value}")
Future.successful(msg)
}).runWith(Sink.ignore)
}
}

Просмотреть файл

@ -1,7 +0,0 @@
name := "scala-event-hubs-producer"
version := "0.1"
scalaVersion := "2.12.6"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"

Просмотреть файл

@ -1,13 +0,0 @@
akka.kafka.producer {
#Akka kafka producer properties can be defined here
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}

Просмотреть файл

@ -1,41 +0,0 @@
import java.util.concurrent.TimeUnit.SECONDS
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, ThrottleMode}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import scala.concurrent.duration.FiniteDuration
import scala.language.postfixOps
object ProducerMain {
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem.apply("akka-stream-kafka")
implicit val materializer: ActorMaterializer = ActorMaterializer()
// grab our settings from the resources/application.conf file
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
// topic to send the message to on event hubs
val topic = "sampleTopic"
// loop until the program reaches 100
Source(1 to 100)
.throttle(1, FiniteDuration(1, SECONDS), 1, ThrottleMode.Shaping)
.map(num => {
//construct our message here
val message = s"Akka Scala Producer Message # ${num}"
println(s"Message sent to topic - $topic - $message")
new ProducerRecord[Array[Byte], String](topic, message.getBytes, message.toString)
})
.runWith(Producer.plainSink(producerSettings))
.onComplete(_ => {
println("All messages sent!")
system.terminate()
})(scala.concurrent.ExecutionContext.global)
}
}

Просмотреть файл

@ -1,6 +0,0 @@
# We've moved!
All Event Hubs for Kafka samples and documentation have been moved to https://github.com/Azure/azure-event-hubs-for-kafka. All other samples/docs relating to native (i.e. non-Kafka) Event Hubs SDKs and clients will still live in this repo.
The Event Hubs for Kafka samples/docs in this repo (i.e. this directory and its subdirectories) are outdated and will be removed soon.

Просмотреть файл

@ -1,57 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.app</groupId>
<artifactId>event-hubs-kafka-akka-consumer</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<version>2.5.16</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-kafka_2.12</artifactId>
<version>0.20</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.12</artifactId>
<version>2.5.16</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -1,61 +0,0 @@
import akka.Done;
import akka.actor.ActorSystem;
import akka.kafka.*;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicLong;
abstract class AbstractConsumer {
protected final ActorSystem system = ActorSystem.create("example");
protected final Materializer materializer = ActorMaterializer.create(system);
// Consumer settings
protected ConsumerSettings<byte[], String> consumerSettings = ConsumerSettings
.create(system, new ByteArrayDeserializer(), new StringDeserializer());
// DB
static class DB {
private final AtomicLong offset = new AtomicLong();
public CompletionStage<Done> save(ConsumerRecord<byte[], String> record) {
System.out.println("DB.save: " + record.value());
offset.set(record.offset());
return CompletableFuture.completedFuture(Done.getInstance());
}
public CompletionStage<Long> loadOffset() {
return CompletableFuture.completedFuture(offset.get());
}
public CompletionStage<Done> update(String data) {
System.out.println(data);
return CompletableFuture.completedFuture(Done.getInstance());
}
}
}
public class AkkaTestConsumer extends AbstractConsumer {
private final static String TOPIC = "test";
public static void main(String[] args) {
new AkkaTestConsumer().demo();
}
//Consumes each message from TOPIC at least once
public void demo() {
final DB db = new DB();
akka.kafka.javadsl.Consumer.committableSource(consumerSettings, Subscriptions.topics(TOPIC))
.mapAsync(1, msg -> db.update(msg.record().value()).thenApply(done -> msg))
.mapAsync(1, msg -> msg.committableOffset().commitJavadsl())
.runWith(Sink.foreach(p -> System.out.println(p)), materializer);
}
}

Просмотреть файл

@ -1,16 +0,0 @@
akka.kafka.consumer {
#Akka Kafka consumer properties defined here
wakeup-timeout=60s
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# defined in this configuration section.
kafka-clients {
request.timeout.ms=60000
group.id=akka-example-consumer
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}

Просмотреть файл

@ -1,57 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.app</groupId>
<artifactId>event-hubs-kafka-akka-producer</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<version>2.5.16</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-kafka_2.12</artifactId>
<version>0.20</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.12</artifactId>
<version>2.5.16</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -1,59 +0,0 @@
import akka.Done;
import akka.actor.ActorSystem;
import akka.kafka.ProducerSettings;
import akka.kafka.javadsl.Producer;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Source;
import akka.stream.StreamLimitReachedException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.concurrent.CompletionStage;
abstract class AbstractProducer {
protected final ActorSystem system = ActorSystem.create("example");
protected final Materializer materializer = ActorMaterializer.create(system);
protected final ProducerSettings<byte[], String> producerSettings = ProducerSettings
.create(system, new ByteArraySerializer(), new StringSerializer());
protected final KafkaProducer<byte[], String> kafkaProducer = producerSettings.createKafkaProducer();
protected final static int RANGE = 100;
protected void terminateWhenDone(CompletionStage<Done> result) {
result.exceptionally(e -> {
if (e instanceof StreamLimitReachedException){
System.out.println("Sent " + RANGE + " messages!");
system.terminate();
}
else
system.log().error(e, e.getMessage());
return Done.getInstance();
})
.thenAccept(d -> system.terminate());
}
}
public class AkkaTestProducer extends AbstractProducer {
private static final String TOPIC = "test";
public static void main(String[] args) {
new AkkaTestProducer().demo();
}
public void demo() {
System.out.println("Sending");
// Sends integer 1-100 to Kafka topic "test"
CompletionStage<Done> done = Source
.range(1, RANGE)
.limit(RANGE - 1)
.map(n -> n.toString()).map(elem -> new ProducerRecord<byte[], String>(TOPIC, elem))
.runWith(Producer.plainSink(producerSettings, kafkaProducer), materializer);
terminateWhenDone(done);
}
}

Просмотреть файл

@ -1,13 +0,0 @@
akka.kafka.producer {
#Akka kafka producer properties can be defined here
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}

Просмотреть файл

@ -1,6 +0,0 @@
# We've moved!
All Event Hubs for Kafka samples and documentation have been moved to https://github.com/Azure/azure-event-hubs-for-kafka. All other samples/docs relating to native (i.e. non-Kafka) Event Hubs SDKs and clients will still live in this repo.
The Event Hubs for Kafka samples/docs in this repo (i.e. this directory and its subdirectories) are outdated and will be removed soon.

Просмотреть файл

@ -1,52 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.app</groupId>
<artifactId>event-hubs-kafka-flink-consumer</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.4.2</version>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -1,31 +0,0 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; //Kafka v0.11.0.0
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Properties;
public class FlinkTestConsumer {
private static final String TOPIC = "test";
private static final String FILE_PATH = "src/main/resources/consumer.config";
public static void main(String... args) {
try {
//Load properties from config file
Properties properties = new Properties();
properties.load(new FileReader(FILE_PATH));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream stream = env.addSource(new FlinkKafkaConsumer011(TOPIC, new SimpleStringSchema(), properties));
stream.print();
env.execute("Testing flink consumer");
} catch(FileNotFoundException e){
System.out.println("FileNoteFoundException: " + e);
} catch (Exception e){
System.out.println("Failed with exception " + e);
}
}
}

Просмотреть файл

@ -1,7 +0,0 @@
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=FlinkExampleConsumer
request.timeout.ms=60000
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

Просмотреть файл

@ -1,52 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.app</groupId>
<artifactId>event-hubs-kafka-flink-producer</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.4.2</version>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -1,46 +0,0 @@
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; //v0.11.0.0
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Properties;
public class FlinkTestProducer {
private static final String TOPIC = "test";
private static final String FILE_PATH = "src/main/resources/producer.config";
public static void main(String... args) {
try {
Properties properties = new Properties();
properties.load(new FileReader(FILE_PATH));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream stream = createStream(env);
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(
TOPIC,
new SimpleStringSchema(), // serialization schema
properties);
stream.addSink(myProducer);
env.execute("Testing flink print");
} catch(FileNotFoundException e){
System.out.println("FileNotFoundException: " + e);
} catch (Exception e) {
System.out.println("Failed with exception:: " + e);
}
}
public static DataStream createStream(StreamExecutionEnvironment env){
return env.generateSequence(0, 200)
.map(new MapFunction<Long, String>() {
@Override
public String map(Long in) {
return "FLINK PRODUCE " + in;
}
});
}
}

Просмотреть файл

@ -1,6 +0,0 @@
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=FlinkExampleProducer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

Просмотреть файл

@ -1,6 +0,0 @@
# We've moved!
All Event Hubs for Kafka samples and documentation have been moved to https://github.com/Azure/azure-event-hubs-for-kafka. All other samples/docs relating to native (i.e. non-Kafka) Event Hubs SDKs and clients will still live in this repo.
The Event Hubs for Kafka samples/docs in this repo (i.e. this directory and its subdirectories) are outdated and will be removed soon.

Просмотреть файл

@ -1,8 +0,0 @@
# Event Hubs Kafka endpoint
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=mirror_maker_producer
# Event Hubs requires secure communication
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 40 KiB

Просмотреть файл

@ -1,5 +0,0 @@
#Simple Kafka set up
bootstrap.servers={SOURCE.KAFKA.IP.ADDRESS1}:{SOURCE.KAFKA.PORT1},{SOURCE.KAFKA.IP.ADDRESS2}:{SOURCE.KAFKA.PORT2},etc
client.id=mirror_maker_consumer
group.id=example-mirrormaker-group
exclude.internal.topics=true

Просмотреть файл

@ -1,6 +0,0 @@
# We've moved!
All Event Hubs for Kafka samples and documentation have been moved to https://github.com/Azure/azure-event-hubs-for-kafka. All other samples/docs relating to native (i.e. non-Kafka) Event Hubs SDKs and clients will still live in this repo.
The Event Hubs for Kafka samples/docs in this repo (i.e. this directory and its subdirectories) are outdated and will be removed soon.

Просмотреть файл

@ -1,35 +0,0 @@
var Transform = require('stream').Transform;
var Kafka = require('node-rdkafka');
var stream = Kafka.KafkaConsumer.createReadStream({
'metadata.broker.list': 'EVENTHUB_FQDN',
'group.id': 'EVENTHUB_CONSUMER_GROUP', //The default for EventHubs is $Default
'socket.keepalive.enable': true,
'enable.auto.commit': false,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': 'EVENTHUB_CONNECTION_STRING'
}, {}, {
topics: 'test',
waitInterval: 0,
objectMode: false
});
stream.on('error', function (err) {
if (err) console.log(err);
process.exit(1);
});
stream
.pipe(process.stdout);
stream.on('error', function (err) {
console.log(err);
process.exit(1);
});
stream.consumer.on('event.error', function (err) {
console.log(err);
})

27
samples/kafka/node/package-lock.json сгенерированный
Просмотреть файл

@ -1,27 +0,0 @@
{
"name": "kafka_node",
"version": "1.0.0",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
"bindings": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/bindings/-/bindings-1.3.0.tgz",
"integrity": "sha512-DpLh5EzMR2kzvX1KIlVC0VkC3iZtHKTgdtZ0a3pglBZdaQFjt5S9g9xd1lE+YvXyfd6mtCeRnrUfOLYiTMlNSw=="
},
"nan": {
"version": "2.10.0",
"resolved": "https://registry.npmjs.org/nan/-/nan-2.10.0.tgz",
"integrity": "sha512-bAdJv7fBLhWC+/Bls0Oza+mvTaNQtP+1RyhhhvD95pgUJz6XM5IzgmxOkItJ9tkoCiplvAnXI1tNmmUD/eScyA=="
},
"node-rdkafka": {
"version": "2.3.3",
"resolved": "https://registry.npmjs.org/node-rdkafka/-/node-rdkafka-2.3.3.tgz",
"integrity": "sha512-2J54zC9+Zj0iRQttmQs1Ubv8aHhmh04XjP3vk39uco7l6tp8BYYHG4XRsoqKOGGKjBLctGpFHr9g97WBE1pTbg==",
"requires": {
"bindings": "1.3.0",
"nan": "2.10.0"
}
}
}
}

Просмотреть файл

@ -1,15 +0,0 @@
{
"name": "kafka_node",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"node-rdkafka": "^2.3.3"
}
}

Просмотреть файл

@ -1,63 +0,0 @@
var Kafka = require('node-rdkafka');
var producer = new Kafka.Producer({
//'debug' : 'all',
'metadata.broker.list': 'EVENTHUB_FQDN',
'dr_cb': true, //delivery report callback
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': 'EVENTHUB_CONNECTION_STRING'
});
var topicName = 'test';
//logging debug messages, if debug is enabled
producer.on('event.log', function(log) {
console.log(log);
});
//logging all errors
producer.on('event.error', function(err) {
console.error('Error from producer');
console.error(err);
});
//counter to stop this sample after maxMessages are sent
var counter = 0;
var maxMessages = 10;
producer.on('delivery-report', function(err, report) {
console.log('delivery-report: ' + JSON.stringify(report));
counter++;
});
//Wait for the ready event before producing
producer.on('ready', function(arg) {
console.log('producer ready.' + JSON.stringify(arg));
for (var i = 0; i < maxMessages; i++) {
var value = new Buffer(`{"name" : "person${i}"}"`);
var key = "key-"+i;
// if partition is set to -1, librdkafka will use the default partitioner
var partition = -1;
producer.produce(topicName, partition, value, key);
}
//need to keep polling for a while to ensure the delivery reports are received
var pollLoop = setInterval(function() {
producer.poll();
if (counter === maxMessages) {
clearInterval(pollLoop);
producer.disconnect();
}
}, 1000);
});
producer.on('disconnected', function(arg) {
console.log('producer disconnected. ' + JSON.stringify(arg));
});
//starting the producer
producer.connect();

Просмотреть файл

@ -1,6 +0,0 @@
# We've moved!
All Event Hubs for Kafka samples and documentation have been moved to https://github.com/Azure/azure-event-hubs-for-kafka. All other samples/docs relating to native (i.e. non-Kafka) Event Hubs SDKs and clients will still live in this repo.
The Event Hubs for Kafka samples/docs in this repo (i.e. this directory and its subdirectories) are outdated and will be removed soon.

Просмотреть файл

@ -1,126 +0,0 @@
#!/usr/bin/env python
#
# Copyright 2016 Confluent Inc.
#
# Modified for use with Azure Event Hubs for Apache Kafka Ecosystems
# 06/18/2018
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Example high-level Kafka 0.9 balanced Consumer
#
from confluent_kafka import Consumer, KafkaException, KafkaError
import sys
import getopt
import json
import logging
from pprint import pformat
def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))
def print_usage_and_exit(program_name):
sys.stderr.write('Usage: %s [options..] <consumer-group> <topic1> <topic2> ..\n' % program_name)
options = '''
Options:
-T <intvl> Enable client statistics at specified interval (ms)
'''
sys.stderr.write(options)
sys.exit(1)
if __name__ == '__main__':
optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
if len(argv) < 2:
print_usage_and_exit(sys.argv[0])
group = argv[0]
topics = argv[1:]
# Consumer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {
'bootstrap.servers': '{YOUR.EVENTHUB.FQDN}:9093',
'security.protocol' : 'SASL_SSL',
'sasl.mechanism' : 'PLAIN',
'sasl.username' : '$ConnectionString',
'sasl.password' : '{YOUR.EVENTHUB.CONNECTION.STRING}',
'group.id': group,
'client.id': 'python-example-consumer',
'request.timeout.ms': 60000,
'default.topic.config': {'auto.offset.reset': 'smallest'}
}
# Check to see if -T option exists
for opt in optlist:
if opt[0] != '-T':
continue
try:
intval = int(opt[1])
except ValueError:
sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
sys.exit(1)
if intval <= 0:
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
sys.exit(1)
conf['stats_cb'] = stats_cb
conf['statistics.interval.ms'] = int(opt[1])
# Create logger for consumer (logs will be emitted when poll() is called)
logger = logging.getLogger('consumer')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
logger.addHandler(handler)
# Create Consumer instance
# Hint: try debug='fetch' to generate some log messages
c = Consumer(conf, logger=logger)
def print_assignment(consumer, partitions):
print('Assignment:', partitions)
# Subscribe to topics
c.subscribe(topics, on_assign=print_assignment)
# Read messages from Kafka, print to stdout
try:
while True:
msg = c.poll(timeout=100.0)
if msg is None:
continue
if msg.error():
# Error or event
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
else:
# Error
raise KafkaException(msg.error())
else:
# Proper message
print(msg.value())
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
finally:
# Close down consumer to commit final offsets.
c.close()

Просмотреть файл

@ -1,66 +0,0 @@
#!/usr/bin/env python
#
#Copyright 2016 Confluent Inc.
#
# Modified for use with Azure Event Hubs for Apache Kafka Ecosystems
# 06/18/2018
#
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
#
#
#Example Kafka Producer.
#
from confluent_kafka import Producer
import sys
if __name__ == '__main__':
if len(sys.argv) != 2:
sys.stderr.write('Usage: %s <topic>\n' % sys.argv[0])
sys.exit(1)
topic = sys.argv[1]
#Producer configuration
#See https: //github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
#See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka#prerequisites for SSL issues
conf = {
'bootstrap.servers': '{YOUR.EVENTHUB.FQDN}:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': '{YOUR.EVENTHUB.CONNECTION.STRING}',
'client.id': 'python-example-producer'
}
#Create Producer instance
p = Producer (**conf)
def delivery_callback (err, msg):
if err:
sys.stderr.write ('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write ('%% Message delivered to %s [%d] @ %o\n' % (msg.topic (), msg.partition (), msg.offset ()))
#Write 1-100 to topic
for i in range(0,100):
try:
p.produce(topic, str(i), callback=delivery_callback)
except BufferError as e:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len (p))
p.poll (0)
#Wait until all messages have been delivered
sys.stderr.write ('%% Waiting for %d deliveries\n' % len (p))
p.flush ()

Просмотреть файл

@ -1,20 +0,0 @@
#!/bin/bash
echo "Downloading/installing necessary libraries and repos"
sudo apt-get install git openssl libssl-dev build-essential python-pip python-dev librdkafka-dev
git clone https://github.com/confluentinc/confluent-kafka-python
git clone https://github.com/edenhill/librdkafka
echo "Setting up librdkafka"
cd librdkafka
./configure
make
sudo make install
cd ..
echo "Setting up Confluent's Python Kafka library"
#The "/" at the end of confluent-kafka-python is important
#If there's no "/" pip will try to download a confluent-kafka-python package and fail to find it
sudo pip install confluent-kafka-python/
echo "Try running the samples now!"
#Sometimes 'sudo apt-get purge librdkafka1' helps if this script doesn't work initially

Просмотреть файл

@ -1,6 +0,0 @@
# We've moved!
All Event Hubs for Kafka samples and documentation have been moved to https://github.com/Azure/azure-event-hubs-for-kafka. All other samples/docs relating to native (i.e. non-Kafka) Event Hubs SDKs and clients will still live in this repo.
The Event Hubs for Kafka samples/docs in this repo (i.e. this directory and its subdirectories) are outdated and will be removed soon.

Просмотреть файл

@ -1,53 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.app</groupId>
<artifactId>event-hubs-kafka-java-consumer</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<!--v1.0-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<!--v1.1-->
<!-- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency> -->
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -1,18 +0,0 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestConsumer {
//Change constant to send messages to the desired topic
private final static String TOPIC = "test";
private final static int NUM_THREADS = 1;
public static void main(String... args) throws Exception {
final ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS);
for (int i = 0; i < NUM_THREADS; i++){
executorService.execute(new TestConsumerThread(TOPIC));
}
}
}

Просмотреть файл

@ -1,70 +0,0 @@
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.FileReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
public class TestConsumerThread implements Runnable {
private final String TOPIC;
//Each consumer needs a unique client ID per thread
private static int id = 0;
public TestConsumerThread(final String TOPIC){
this.TOPIC = TOPIC;
}
public void run (){
final Consumer<Long, String> consumer = createConsumer();
System.out.println("Polling");
try {
while (true) {
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
for(ConsumerRecord<Long, String> cr : consumerRecords) {
System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", cr.key(), cr.value(), cr.partition(), cr.offset());
}
consumer.commitAsync();
}
} catch (CommitFailedException e) {
System.out.println("CommitFailedException: " + e);
} finally {
consumer.close();
}
}
private Consumer<Long, String> createConsumer() {
try {
final Properties properties = new Properties();
synchronized (TestConsumerThread.class) {
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "KafkaExampleConsumer#" + id);
id++;
}
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//Get remaining properties from config file
properties.load(new FileReader("src/main/resources/consumer.config"));
// Create the consumer using properties.
final Consumer<Long, String> consumer = new KafkaConsumer<>(properties);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
} catch (FileNotFoundException e){
System.out.println("FileNoteFoundException: " + e);
System.exit(1);
return null; //unreachable
} catch (IOException e){
System.out.println("IOException: " + e);
System.exit(1);
return null; //unreachable
}
}
}

Просмотреть файл

@ -1,6 +0,0 @@
bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093
group.id=$Default
request.timeout.ms=60000
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

Просмотреть файл

@ -1,53 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.app</groupId>
<artifactId>event-hubs-kafka-java-producer</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<!--v1.0-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<!--v1.1-->
<!-- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency> -->
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -1,37 +0,0 @@
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.sql.Timestamp;
public class TestDataReporter implements Runnable {
private static final int NUM_MESSAGES = 100;
private final String TOPIC;
private Producer<Long, String> producer;
public TestDataReporter(final Producer<Long, String> producer, String TOPIC) {
this.producer = producer;
this.TOPIC = TOPIC;
}
@Override
public void run() {
for(int i = 0; i < NUM_MESSAGES; i++) {
long time = System.currentTimeMillis();
System.out.println("Test Data #" + i + " from thread #" + Thread.currentThread().getId());
final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data #" + i);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println(exception);
System.exit(1);
}
}
});
}
System.out.println("Finished sending " + NUM_MESSAGES + " messages from thread #" + Thread.currentThread().getId() + "!");
}
}

Просмотреть файл

@ -1,46 +0,0 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.io.FileReader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestProducer {
//Change constant to send messages to the desired topic, for this example we use 'test'
private final static String TOPIC = "test";
private final static int NUM_THREADS = 1;
public static void main(String... args) throws Exception {
//Create Kafka Producer
final Producer<Long, String> producer = createProducer();
final ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS);
//Run NUM_THREADS TestDataReporters
for (int i = 0; i < NUM_THREADS; i++)
executorService.execute(new TestDataReporter(producer, TOPIC));
}
private static Producer<Long, String> createProducer() {
try{
Properties properties = new Properties();
properties.load(new FileReader("src/main/resources/producer.config"));
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(properties);
} catch (Exception e){
System.out.println("Failed to create producer with exception: " + e);
System.exit(0);
return null; //unreachable
}
}
}

Просмотреть файл

@ -1,4 +0,0 @@
bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";