diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java index 22564fa..f4d7ad1 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java @@ -94,7 +94,7 @@ public class FileWriter implements Closeable { fos.getChannel().truncate(0); countingStream = new CountingOutputStream(fos); - outputStream = shouldCompressData ? new GZIPOutputStream(fos) : countingStream; + outputStream = shouldCompressData ? new GZIPOutputStream(countingStream) : countingStream; fileDescriptor.file = file; currentFile = fileDescriptor; } @@ -107,7 +107,7 @@ public class FileWriter implements Closeable { void finishFile() throws IOException { if(isDirty()){ if(shouldCompressData){ - GZIPOutputStream gzip = (GZIPOutputStream )outputStream; + GZIPOutputStream gzip = (GZIPOutputStream) outputStream; gzip.finish(); } else { outputStream.flush(); diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java index 98ae72c..1352376 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java @@ -58,18 +58,16 @@ public class TopicPartitionWriter { public String getFilePath() { long nextOffset = fileWriter != null && fileWriter.isDirty() ? currentOffset + 1 : currentOffset; - // Output files are always compressed - String compressionExtension; - if (this.eventDataCompression == null) { - if(shouldCompressData(ingestionProps, null)){ - compressionExtension = ".gz"; - } else { - compressionExtension = ""; - } - } else { - compressionExtension = "." + this.eventDataCompression.toString(); - } + String compressionExtension = ""; + + if (shouldCompressData(ingestionProps, null) || eventDataCompression != null) { + if(eventDataCompression != null) { + compressionExtension = "." + eventDataCompression.toString(); + } else { + compressionExtension = ".gz"; + } + } return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat(), compressionExtension)).toString(); }