зеркало из
1
0
Форкнуть 0
This commit is contained in:
Ohad Bitton 2019-12-04 13:55:19 +02:00
Родитель 77eaf00e67
Коммит 0fcdae8406
6 изменённых файлов: 33 добавлений и 38 удалений

Просмотреть файл

@ -1,6 +1,5 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -24,31 +23,32 @@ public class FileWriter implements Closeable {
private Timer timer;
private Consumer<FileDescriptor> onRollCallback;
private final long flushInterval;
private final boolean shouldCompressData;
private Supplier<String> getFilePath;
private OutputStream outputStream;
private String basePath;
private CountingOutputStream fileStream;
private long fileThreshold;
private final CompressionType compressionType;
/**
* @param basePath - This is path to which to write the files to.
* @param fileThreshold - Max size, uncompressed bytes.
* @param onRollCallback - Callback to allow code to execute when rolling a file. Blocking code.
* @param getFilePath - Allow external resolving of file name.
* @param shouldCompressData - Should the FileWriter compress the incoming data
*/
public FileWriter(String basePath,
long fileThreshold,
Consumer<FileDescriptor> onRollCallback,
Supplier<String> getFilePath,
long flushInterval,
CompressionType compressionType) {
boolean shouldCompressData) {
this.getFilePath = getFilePath;
this.basePath = basePath;
this.fileThreshold = fileThreshold;
this.onRollCallback = onRollCallback;
this.flushInterval = flushInterval;
this.compressionType = compressionType;
this.shouldCompressData = shouldCompressData;
}
public boolean isDirty() {
@ -83,7 +83,7 @@ public class FileWriter implements Closeable {
throw new IOException(String.format("Failed to create new directory %s", folder.getPath()));
}
String filePath = getFilePath.get() + (compressionType == CompressionType.zip ? ".zip" : ".gz");
String filePath = getFilePath.get();
fileDescriptor.path = filePath;
File file = new File(filePath);
@ -94,7 +94,7 @@ public class FileWriter implements Closeable {
fos.getChannel().truncate(0);
fileStream = new CountingOutputStream(fos);
outputStream = compressionType == null ? new GZIPOutputStream(fileStream) : fileStream;
outputStream = shouldCompressData ? new GZIPOutputStream(fileStream) : fileStream;
fileDescriptor.file = file;
currentFile = fileDescriptor;
@ -107,16 +107,10 @@ public class FileWriter implements Closeable {
private void finishFile() throws IOException {
if (isDirty()) {
if(compressionType == null){
((GZIPOutputStream) outputStream).finish();
} else {
outputStream.flush();
}
outputStream.close();
onRollCallback.accept(currentFile);
}
// closing late so that the success callback will have a chance to use the file.
outputStream.close();
currentFile.file.delete();
}
@ -181,7 +175,7 @@ public class FileWriter implements Closeable {
private class CountingOutputStream extends FilterOutputStream {
private long numBytes = 0;
CountingOutputStream(OutputStream out) throws IOException {
CountingOutputStream(OutputStream out) {
super(out);
}

Просмотреть файл

@ -88,7 +88,7 @@ public class KustoSinkTask extends SinkTask {
String table = mapping.getString("table");
String format = mapping.optString("format");
CompressionType compressionType = StringUtils.isBlank(mapping.optString("compression")) ? null : CompressionType.valueOf(mapping.optString("compression"));
CompressionType compressionType = StringUtils.isBlank(mapping.optString("eventDataCompression")) ? null : CompressionType.valueOf(mapping.optString("eventDataCompression"));
IngestionProperties props = new IngestionProperties(db, table);
@ -115,11 +115,11 @@ public class KustoSinkTask extends SinkTask {
}
}
TableIngestionProperties tableIngestionProperties = new TableIngestionProperties();
tableIngestionProperties.compressionType = compressionType;
tableIngestionProperties.eventDataCompression = compressionType;
tableIngestionProperties.ingestionProperties = props;
result.put(mapping.getString("topic"), tableIngestionProperties);
} catch (Exception ex) {
throw new ConfigException("Malformed topics to kusto ingestion props mappings");
throw new ConfigException("Malformed topics to kusto ingestion props mappings", ex);
}
}

Просмотреть файл

@ -5,5 +5,5 @@ import com.microsoft.azure.kusto.ingest.source.CompressionType;
class TableIngestionProperties {
IngestionProperties ingestionProperties = null;
CompressionType compressionType = null;
CompressionType eventDataCompression = null;
}

Просмотреть файл

@ -29,7 +29,7 @@ public class TopicPartitionWriter {
long currentOffset;
Long lastCommittedOffset;
TopicPartitionWriter(TopicPartition tp, IngestClient client, TableIngestionProperties ingestionProps, String basePath, long fileThreshold, long flushInterval, CompressionType compressionType) {
TopicPartitionWriter(TopicPartition tp, IngestClient client, TableIngestionProperties ingestionProps, String basePath, long fileThreshold, long flushInterval) {
this.tp = tp;
this.client = client;
this.ingestionProps = ingestionProps.ingestionProperties;
@ -37,12 +37,7 @@ public class TopicPartitionWriter {
this.basePath = basePath;
this.flushInterval = flushInterval;
this.currentOffset = 0;
this.compressionType = ingestionProps.compressionType;
}
TopicPartitionWriter(TopicPartition tp, IngestClient client, TableIngestionProperties ingestionProps, String basePath, long fileThreshold, long flushInterval
) {
this(tp, client, ingestionProps, basePath, fileThreshold, flushInterval, null);
this.compressionType = ingestionProps.eventDataCompression;
}
public void handleRollFile(FileDescriptor fileDescriptor) {
@ -59,7 +54,10 @@ public class TopicPartitionWriter {
public String getFilePath() {
long nextOffset = fileWriter != null && fileWriter.isDirty() ? currentOffset + 1 : currentOffset;
return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat())).toString();
// Output files are always compressed
String compressionExtension = this.compressionType == null ? "gz" : this.compressionType.toString();
return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s.%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat(), compressionExtension)).toString();
}
public void writeRecord(SinkRecord record) {
@ -99,8 +97,13 @@ public class TopicPartitionWriter {
|| ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.parquet.toString())
|| this.compressionType != null;
fileWriter = new FileWriter(basePath, fileThreshold, this::handleRollFile, this::getFilePath,
flushImmediately ? 0 : flushInterval, this.compressionType);
fileWriter = new FileWriter(
basePath,
fileThreshold,
this::handleRollFile,
this::getFilePath,
flushImmediately ? 0 : flushInterval,
this.compressionType == null);
}
public void close() {

Просмотреть файл

@ -1,7 +1,5 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.sun.xml.internal.messaging.saaj.util.ByteInputStream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.junit.After;
@ -58,7 +56,7 @@ public class FileWriterTest {
Supplier<String> generateFileName = () -> FILE_PATH;
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, null);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);
fileWriter.openFile();
@ -88,7 +86,7 @@ public class FileWriterTest {
Supplier<String> generateFileName = () -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString();
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, null);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);
for (int i = 0; i < 9; i++) {
String msg = String.format("Line number %d : This is a message from the other size", i);
@ -128,7 +126,7 @@ public class FileWriterTest {
Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString();
// Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, null);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);
String msg = "Message";
fileWriter.write(msg.getBytes(StandardCharsets.UTF_8));
@ -145,7 +143,7 @@ public class FileWriterTest {
Supplier<String> generateFileName2 = () -> Paths.get(path2, java.util.UUID.randomUUID().toString()).toString();
// Expect one file to be ingested as flushInterval had changed
FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, null);
FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, false);
String msg2 = "Second Message";
@ -205,10 +203,10 @@ public class FileWriterTest {
}
};
Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv";
Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";
// Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, CompressionType.gz);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, true);
gzipOutputStream.write(msg.getBytes());
gzipOutputStream.finish();

Просмотреть файл

@ -222,7 +222,7 @@ public class KustoSinkTaskTest {
public void getTable() {
HashMap<String, String> props = new HashMap<>();
props.put(KustoSinkConfig.KUSTO_URL, "https://cluster_name.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv', 'compression':'gz'},{'topic': 'topic2','db': 'db2', 'table': 'table2','format': 'json','mapping': 'Mapping'}]");
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv', 'eventDataCompression':'gz'},{'topic': 'topic2','db': 'db2', 'table': 'table2','format': 'json','mapping': 'Mapping'}]");
props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME, "test@test.com");
props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD, "123456!");
@ -237,7 +237,7 @@ public class KustoSinkTaskTest {
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").ingestionProperties.getTableName(), "table2");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").ingestionProperties.getDataFormat(), "json");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").ingestionProperties.getIngestionMapping().getIngestionMappingReference(), "Mapping");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").compressionType, CompressionType.gz);
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").eventDataCompression, CompressionType.gz);
Assert.assertNull(kustoSinkTask.getIngestionProps("topic3"));
}
}