Merge pull request #228 from vigneshc/updateFlinkDeps

Update Flink dependencies to 1.16
This commit is contained in:
Eric Lam (MSFT) 2023-01-24 19:46:08 -08:00 коммит произвёл GitHub
Родитель fe5931a0a7 a7d9dd5a20
Коммит 1160692be5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 48 добавлений и 38 удалений

Просмотреть файл

@ -9,22 +9,25 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<flink.version>1.16.0</flink.version>
<target.java.version>1.8</target.java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.4.2</version>
</dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.4.2</version>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
@ -35,8 +38,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>

Просмотреть файл

@ -1,48 +1,56 @@
package com.example.app;
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; //v0.11.0.0
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Properties;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkTestProducer {
private static final String TOPIC = "test";
private static final String TOPIC = "<EventhubName>";
private static final String FILE_PATH = "src/main/resources/producer.config";
public static void main(String... args) {
try {
// load properties from file
Properties properties = new Properties();
properties.load(new FileReader(FILE_PATH));
// set properties.
KafkaSinkBuilder<String> kafkaSinkBuilder = KafkaSink.<String>builder();
for(String property: properties.stringPropertyNames())
{
kafkaSinkBuilder.setProperty(property, properties.getProperty(property));
}
// set serializer type.
kafkaSinkBuilder.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(TOPIC)
.setValueSerializationSchema(new SimpleStringSchema())
.build());
KafkaSink<String> kafkaSink = kafkaSinkBuilder.build();
// create stream environment, send simple stream to eventhub.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream stream = createStream(env);
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(
TOPIC,
new SimpleStringSchema(), // serialization schema
properties);
stream.addSink(myProducer);
env.execute("Testing flink print");
DataStream<String> sourceStream = env.fromElements("1", "2", "3");
sourceStream.sinkTo(kafkaSink);
// run the job.
env.execute("Send to eventhub using Kafka api");
} catch(FileNotFoundException e){
System.out.println("FileNotFoundException: " + e);
} catch (Exception e) {
System.out.println("Failed with exception:: " + e);
}
}
public static DataStream createStream(StreamExecutionEnvironment env){
return env.generateSequence(0, 200)
.map(new MapFunction<Long, String>() {
@Override
public String map(Long in) {
return "FLINK PRODUCE " + in;
}
});
}
}

Просмотреть файл

@ -1,6 +1,5 @@
bootstrap.servers=mynamespace.servicebus.windows.net:9093
client.id=FlinkExampleProducer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=XXXXXX;SharedAccessKey=XXXXXX";