This commit is contained in:
Ohad Bitton 2020-05-06 13:20:51 +03:00
Родитель f8271ec6c1
Коммит cc4056474d
6 изменённых файлов: 44 добавлений и 30 удалений

Просмотреть файл

@ -8,7 +8,7 @@
<groupId>com.microsoft.azure</groupId> <groupId>com.microsoft.azure</groupId>
<artifactId>kafka-sink-azure-kusto</artifactId> <artifactId>kafka-sink-azure-kusto</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<version>0.3.3</version> <version>0.3.4</version>
<build> <build>
<plugins> <plugins>
<plugin> <plugin>
@ -71,7 +71,7 @@
<kafka.version>1.0.0</kafka.version> <kafka.version>1.0.0</kafka.version>
<json.version>20090211</json.version> <json.version>20090211</json.version>
<commonio.version>2.6</commonio.version> <commonio.version>2.6</commonio.version>
<kusto-sdk.version>1.4.2</kusto-sdk.version> <kusto-sdk.version>2.0.2</kusto-sdk.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> </properties>
<dependencies> <dependencies>

Просмотреть файл

@ -2,7 +2,7 @@ package com.microsoft.azure.kusto.kafka.connect.sink;
import java.io.File; import java.io.File;
public class FileDescriptor { public class FileProperties {
long rawBytes = 0; long rawBytes = 0;
long zippedBytes = 0; long zippedBytes = 0;
long numRecords = 0; long numRecords = 0;

Просмотреть файл

@ -4,13 +4,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.*; import java.io.*;
import java.util.Objects;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
/** /**
* This class is used to write gzipped rolling files. * This class is used to write gzipped rolling files.
* Currently supports size based rolling, where size is for *uncompressed* size, * Currently supports size based rolling, where size is for *uncompressed* size,
@ -19,9 +19,9 @@ import java.util.zip.GZIPOutputStream;
public class FileWriter implements Closeable { public class FileWriter implements Closeable {
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class); private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
FileDescriptor currentFile; FileProperties currentFile;
private Timer timer; private Timer timer;
private Consumer<FileDescriptor> onRollCallback; private Consumer<FileProperties> onRollCallback;
private final long flushInterval; private final long flushInterval;
private final boolean shouldCompressData; private final boolean shouldCompressData;
private Supplier<String> getFilePath; private Supplier<String> getFilePath;
@ -29,6 +29,7 @@ public class FileWriter implements Closeable {
private String basePath; private String basePath;
private CountingOutputStream countingStream; private CountingOutputStream countingStream;
private long fileThreshold; private long fileThreshold;
private FileDescriptor currentFd;
/** /**
* @param basePath - This is path to which to write the files to. * @param basePath - This is path to which to write the files to.
@ -39,7 +40,7 @@ public class FileWriter implements Closeable {
*/ */
public FileWriter(String basePath, public FileWriter(String basePath,
long fileThreshold, long fileThreshold,
Consumer<FileDescriptor> onRollCallback, Consumer<FileProperties> onRollCallback,
Supplier<String> getFilePath, Supplier<String> getFilePath,
long flushInterval, long flushInterval,
boolean shouldCompressData) { boolean shouldCompressData) {
@ -76,7 +77,7 @@ public class FileWriter implements Closeable {
} }
public void openFile() throws IOException { public void openFile() throws IOException {
FileDescriptor fileDescriptor = new FileDescriptor(); FileProperties fileProps = new FileProperties();
File folder = new File(basePath); File folder = new File(basePath);
if (!folder.exists() && !folder.mkdirs()) { if (!folder.exists() && !folder.mkdirs()) {
@ -84,19 +85,20 @@ public class FileWriter implements Closeable {
} }
String filePath = getFilePath.get(); String filePath = getFilePath.get();
fileDescriptor.path = filePath; fileProps.path = filePath;
File file = new File(filePath); File file = new File(filePath);
file.createNewFile(); file.createNewFile();
FileOutputStream fos = new FileOutputStream(file); FileOutputStream fos = new FileOutputStream(file);
currentFd = fos.getFD();
fos.getChannel().truncate(0); fos.getChannel().truncate(0);
countingStream = new CountingOutputStream(fos); countingStream = new CountingOutputStream(fos);
outputStream = shouldCompressData ? new GZIPOutputStream(countingStream) : countingStream; outputStream = shouldCompressData ? new GZIPOutputStream(countingStream) : countingStream;
fileDescriptor.file = file; fileProps.file = file;
currentFile = fileDescriptor; currentFile = fileProps;
} }
void rotate() throws IOException { void rotate() throws IOException {
@ -105,6 +107,10 @@ public class FileWriter implements Closeable {
} }
void finishFile() throws IOException { void finishFile() throws IOException {
finishFile(true);
}
void finishFile(Boolean delete) throws IOException {
if(isDirty()){ if(isDirty()){
if(shouldCompressData){ if(shouldCompressData){
GZIPOutputStream gzip = (GZIPOutputStream) outputStream; GZIPOutputStream gzip = (GZIPOutputStream) outputStream;
@ -114,18 +120,29 @@ public class FileWriter implements Closeable {
} }
onRollCallback.accept(currentFile); onRollCallback.accept(currentFile);
currentFile.file.delete(); if (delete){
dumpFile();
}
} else {
outputStream.close();
} }
}
// closing late so that the success callback will have a chance to use the file. This is a real thing on debug?! private void dumpFile() throws IOException {
outputStream.close(); outputStream.close();
currentFd = null;
boolean deleted = currentFile.file.delete();
if (!deleted) {
log.warn("couldn't delete temporary file. File exists: " + currentFile.file.exists());// +
// ". If file does not exist please contact kusto team.");
}
} }
public void rollback() throws IOException { public void rollback() throws IOException {
if (outputStream != null) { if (outputStream != null) {
outputStream.close(); outputStream.close();
if (currentFile != null && currentFile.file != null) { if (currentFile != null && currentFile.file != null) {
currentFile.file.delete(); dumpFile();
} }
} }
} }

Просмотреть файл

@ -40,7 +40,7 @@ public class TopicPartitionWriter {
this.eventDataCompression = ingestionProps.eventDataCompression; this.eventDataCompression = ingestionProps.eventDataCompression;
} }
public void handleRollFile(FileDescriptor fileDescriptor) { public void handleRollFile(FileProperties fileDescriptor) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(fileDescriptor.path, fileDescriptor.rawBytes); FileSourceInfo fileSourceInfo = new FileSourceInfo(fileDescriptor.path, fileDescriptor.rawBytes);
try { try {

Просмотреть файл

@ -52,7 +52,7 @@ public class FileWriterTest {
final String FILE_PATH = Paths.get(path, "ABC").toString(); final String FILE_PATH = Paths.get(path, "ABC").toString();
final int MAX_FILE_SIZE = 128; final int MAX_FILE_SIZE = 128;
Consumer<FileDescriptor> trackFiles = (FileDescriptor f) -> {}; Consumer<FileProperties> trackFiles = (FileProperties f) -> {};
Supplier<String> generateFileName = () -> FILE_PATH; Supplier<String> generateFileName = () -> FILE_PATH;
@ -82,9 +82,9 @@ public class FileWriterTest {
final int MAX_FILE_SIZE = 100; final int MAX_FILE_SIZE = 100;
Consumer<FileDescriptor> trackFiles = (FileDescriptor f) -> files.put(f.path, f.rawBytes); Consumer<FileProperties> trackFiles = (FileProperties f) -> files.put(f.path, f.rawBytes);
Supplier<String> generateFileName = () -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString(); Supplier<String> generateFileName = () -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString() + "csv.gz";
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false); FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);
@ -121,9 +121,9 @@ public class FileWriterTest {
final int MAX_FILE_SIZE = 128 * 2; final int MAX_FILE_SIZE = 128 * 2;
Consumer<FileDescriptor> trackFiles = (FileDescriptor f) -> files.put(f.path, f.rawBytes); Consumer<FileProperties> trackFiles = (FileProperties f) -> files.put(f.path, f.rawBytes);
Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString(); Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + "csv.gz";
// Expect no files to be ingested as size is small and flushInterval is big // Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false); FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);
@ -177,7 +177,7 @@ public class FileWriterTest {
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream); GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
String msg = "Message"; String msg = "Message";
Consumer<FileDescriptor> trackFiles = getAssertFileConsumer(msg); Consumer<FileProperties> trackFiles = getAssertFileConsumer(msg);
Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz"; Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";
@ -189,11 +189,11 @@ public class FileWriterTest {
fileWriter.write(byteArrayOutputStream.toByteArray()); fileWriter.write(byteArrayOutputStream.toByteArray());
fileWriter.close(); fileWriter.close();
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0); Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 1);
} }
static Consumer<FileDescriptor> getAssertFileConsumer(String msg) { static Consumer<FileProperties> getAssertFileConsumer(String msg) {
return (FileDescriptor f) -> { return (FileProperties f) -> {
try (FileInputStream fileInputStream = new FileInputStream(f.file)) { try (FileInputStream fileInputStream = new FileInputStream(f.file)) {
byte[] bytes = IOUtils.toByteArray(fileInputStream); byte[] bytes = IOUtils.toByteArray(fileInputStream);
try (ByteArrayInputStream bin = new ByteArrayInputStream(bytes); try (ByteArrayInputStream bin = new ByteArrayInputStream(bytes);

Просмотреть файл

@ -2,7 +2,6 @@ package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.ingest.IngestClient; import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionProperties; import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
@ -19,11 +18,9 @@ import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.zip.GZIPOutputStream;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
@ -64,7 +61,7 @@ public class TopicPartitionWriterTest {
props.ingestionProperties = ingestionProperties; props.ingestionProperties = ingestionProperties;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, basePath, fileThreshold, flushInterval); TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, basePath, fileThreshold, flushInterval);
FileDescriptor descriptor = new FileDescriptor(); FileProperties descriptor = new FileProperties();
descriptor.rawBytes = 1024; descriptor.rawBytes = 1024;
descriptor.path = "somepath/somefile"; descriptor.path = "somepath/somefile";
descriptor.file = new File ("C://myfile.txt"); descriptor.file = new File ("C://myfile.txt");
@ -237,8 +234,8 @@ public class TopicPartitionWriterTest {
Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 16, IngestionProperties.DATA_FORMAT.csv.name())).toString()); Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 16, IngestionProperties.DATA_FORMAT.csv.name())).toString());
// Read // Read
writer.fileWriter.finishFile(); writer.fileWriter.finishFile(false);
Consumer<FileDescriptor> assertFileConsumer = FileWriterTest.getAssertFileConsumer(messages[2] + "\n"); Consumer<FileProperties> assertFileConsumer = FileWriterTest.getAssertFileConsumer(messages[2] + "\n");
assertFileConsumer.accept(writer.fileWriter.currentFile); assertFileConsumer.accept(writer.fileWriter.currentFile);
} }