This commit is contained in:
Vignesh Chandramohan 2023-01-07 20:06:09 -08:00
Родитель 6780188777
Коммит a7d9dd5a20
2 изменённых файлов: 30 добавлений и 7 удалений

Просмотреть файл

@ -10,8 +10,15 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<flink.version>1.16.0</flink.version>
<target.java.version>1.8</target.java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
@ -31,8 +38,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>

Просмотреть файл

@ -1,9 +1,13 @@
package com.example.app;
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Properties;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
@ -11,25 +15,37 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkTestProducer {
private static final String TOPIC = "test";
private static final String TOPIC = "<EventhubName>";
private static final String FILE_PATH = "src/main/resources/producer.config";
public static void main(String... args) {
try {
// load properties from file
Properties properties = new Properties();
properties.load(new FileReader(FILE_PATH));
KafkaSinkBuilder<Long> kafkaSinkBuilder = KafkaSink.<Long>builder();
// set properties.
KafkaSinkBuilder<String> kafkaSinkBuilder = KafkaSink.<String>builder();
for(String property: properties.stringPropertyNames())
{
kafkaSinkBuilder.setProperty(property, properties.getProperty(property));
}
KafkaSink<Long> kafkaSink = kafkaSinkBuilder.build();
// set serializer type.
kafkaSinkBuilder.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(TOPIC)
.setValueSerializationSchema(new SimpleStringSchema())
.build());
KafkaSink<String> kafkaSink = kafkaSinkBuilder.build();
// create stream environment, send simple stream to eventhub.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> sourceStream = env.fromSequence(0, 200);
DataStream<String> sourceStream = env.fromElements("1", "2", "3");
sourceStream.sinkTo(kafkaSink);
// run the job.
env.execute("Send to eventhub using Kafka api");
} catch(FileNotFoundException e){
System.out.println("FileNotFoundException: " + e);