Merge branch 'master' into addOptionalEngineUrlConfig

This commit is contained in:
Yihezkel Schoenbrun 2020-10-18 10:39:01 +03:00 коммит произвёл GitHub
Родитель e6f366971e 25f91b8dd3
Коммит 0988d031a6
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 40 добавлений и 55 удалений

Просмотреть файл

@ -9,10 +9,11 @@
<artifactId>kafka-sink-azure-kusto</artifactId>
<packaging>jar</packaging>
<description>A Kafka Connect plugin for Azure Data Explorer (Kusto) Database</description>
<version>1.0.3</version>
<version>1.0.4</version>
<properties>
<kafka.version>1.0.0</kafka.version>
<json.version>20090211</json.version>
<junit.version>4.13.1</junit.version>
<commonio.version>2.6</commonio.version>
<kusto-sdk.version>2.1.0</kusto-sdk.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@ -148,10 +149,11 @@
<artifactId>commons-io</artifactId>
<version>${commonio.version}</version>
</dependency>
<!-- TODO: We're using both JUnit4 and JUnit 5 (see next dependency). Remove JUnit 4. -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.7</version>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>

Просмотреть файл

@ -100,8 +100,12 @@ public class FileWriter implements Closeable {
File folder = new File(basePath);
if (!folder.exists() && !folder.mkdirs()) {
throw new IOException(String.format("Failed to create new directory %s", folder.getPath()));
if (!folder.exists()) {
throw new IOException(String.format("Failed to create new directory %s", folder.getPath()));
}
log.warn("Couldn't create the directory because it already exists (likely a race condition)");
}
String filePath = getFilePath.apply(offset);
fileProps.path = filePath;
File file = new File(filePath);
@ -123,7 +127,7 @@ public class FileWriter implements Closeable {
}
void finishFile(Boolean delete) throws IOException, DataException {
if(isDirty()){
if (isDirty()) {
recordWriter.commit();
GZIPOutputStream gzip = (GZIPOutputStream) outputStream;
gzip.finish();
@ -137,7 +141,7 @@ public class FileWriter implements Closeable {
* Also, throwing/logging the exception with just a message to
* avoid polluting logs with duplicate trace.
*/
handleErrors("Failed to write records to KustoDB.");
handleErrors("Failed to write records to KustoDB.", e);
}
if (delete){
dumpFile();
@ -147,13 +151,13 @@ public class FileWriter implements Closeable {
}
}
private void handleErrors(String message) {
private void handleErrors(String message, Exception e) {
if (KustoSinkConfig.BehaviorOnError.FAIL == behaviorOnError) {
throw new ConnectException(message);
throw new ConnectException(message, e);
} else if (KustoSinkConfig.BehaviorOnError.LOG == behaviorOnError) {
log.error("{}", message);
log.error("{}", message, e);
} else {
log.debug("{}", message);
log.debug("{}", message, e);
}
}
@ -177,9 +181,8 @@ public class FileWriter implements Closeable {
}
public void close() throws IOException, DataException {
if (timer!= null) {
if (timer != null) {
timer.cancel();
timer.purge();
}
// Flush last file, updating index
@ -194,7 +197,6 @@ public class FileWriter implements Closeable {
if (flushInterval > 0) {
if (shouldDestroyTimer) {
if (timer != null) {
timer.purge();
timer.cancel();
}
@ -207,7 +209,7 @@ public class FileWriter implements Closeable {
flushByTimeImpl();
}
};
if(timer != null) {
if (timer != null) {
timer.schedule(t, flushInterval);
}
}
@ -218,11 +220,11 @@ public class FileWriter implements Closeable {
// Flush time interval gets the write lock so that it won't starve
reentrantReadWriteLock.writeLock().lock();
// Lock before the check so that if a writing process just flushed this won't ingest empty files
if (currentFile != null && currentFile.rawBytes > 0) {
if (isDirty()) {
finishFile(true);
}
reentrantReadWriteLock.writeLock().unlock();
resetFlushTimer(false);
reentrantReadWriteLock.writeLock().unlock();
} catch (Exception e) {
String fileName = currentFile == null ? "no file created yet" : currentFile.file.getName();
long currentSize = currentFile == null ? 0 : currentFile.rawBytes;
@ -239,6 +241,7 @@ public class FileWriter implements Closeable {
if (recordWriterProvider == null) {
initializeRecordWriter(record);
}
if (currentFile == null) {
openFile(record.kafkaOffset());
resetFlushTimer(true);

Просмотреть файл

@ -1,66 +1,37 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
public class KustoSinkConnector extends SinkConnector {
private static final Logger log = LoggerFactory.getLogger(KustoSinkConnector.class);
private KustoSinkConfig config;
private String tempDirPath;
@Override
public void start(Map<String, String> props) {
log.info("Starting KustoSinkConnector.");
config = new KustoSinkConfig(props);
createTempDirectory(config.getTempDirPath());
}
/*
* Common temporary directory for the Connector tasks
* to write all temp files for ingestion.
*/
private void createTempDirectory(String tempDirPath) {
String systemTempDirPath = tempDirPath;
String tempDir = "kusto-sink-connector-" + UUID.randomUUID().toString();
Path path = Paths.get(systemTempDirPath, tempDir);
try {
Files.createDirectories(path);
} catch (IOException e) {
throw new ConfigException("Failed to create temp directory=" + tempDir, e);
}
this.tempDirPath = path.toString();
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
if (maxTasks == 0) {
log.warn("No Connector tasks have been configured.");
}
List<Map<String, String>> configs = new ArrayList<>();
Map<String, String> taskProps = new HashMap<>();
taskProps.putAll(config.originalsStrings());
taskProps.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, tempDirPath);
for (int i = 0; i < maxTasks; i++) {
configs.add(taskProps);
}
@ -70,11 +41,6 @@ public class KustoSinkConnector extends SinkConnector {
@Override
public void stop() {
log.info("Shutting down KustoSinkConnector");
try {
FileUtils.deleteDirectory(new File(tempDirPath));
} catch (IOException e) {
log.error("Unable to delete temporary connector folder {}", config.getTempDirPath());
}
}
@Override

Просмотреть файл

@ -54,9 +54,6 @@ public class KustoSinkTask extends SinkTask {
private KustoSinkConfig config;
protected IngestClient kustoIngestClient;
protected Map<TopicPartition, TopicPartitionWriter> writers;
private long maxFileSize;
private long flushInterval;
private String tempDir;
private boolean isDlqEnabled;
private String dlqTopicName;
private Producer<byte[], byte[]> dlqProducer;

Просмотреть файл

@ -11,16 +11,22 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.commons.io.FileUtils;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -54,7 +60,7 @@ class TopicPartitionWriter {
this.client = client;
this.ingestionProps = ingestionProps.ingestionProperties;
this.fileThreshold = config.getFlushSizeBytes();
this.basePath = config.getTempDirPath();
this.basePath = getTempDirectoryName(config.getTempDirPath());
this.flushInterval = config.getFlushInterval();
this.currentOffset = 0;
this.reentrantReadWriteLock = new ReentrantReadWriteLock(true);
@ -205,6 +211,17 @@ class TopicPartitionWriter {
} catch (Exception e) {
log.error("Failed to close kafka producer={}", e);
}
try {
FileUtils.deleteDirectory(new File(basePath));
} catch (IOException e) {
log.error("Unable to delete temporary connector folder {}", basePath);
}
}
static String getTempDirectoryName(String tempDirPath) {
String tempDir = "kusto-sink-connector-" + UUID.randomUUID().toString();
Path path = Paths.get(tempDirPath, tempDir);
return path.toString();
}
}