From 715303d3187fe55fb7f95271de480bf5a8fe3eec Mon Sep 17 00:00:00 2001 From: hasher Date: Wed, 15 Jul 2020 16:47:51 +0530 Subject: [PATCH] Refactored countingOutputStream --- .../connect/sink/CountingOutputStream.java | 37 ----------------- .../kusto/kafka/connect/sink/FileWriter.java | 40 ++++++++++++++++--- .../sink/format/RecordWriterProvider.java | 4 +- .../AvroRecordWriterProvider.java | 3 +- .../ByteRecordWriterProvider.java | 3 +- .../JsonRecordWriterProvider.java | 6 +-- .../StringRecordWriterProvider.java | 4 +- .../formatWriter/AvroRecordWriterTest.java | 4 +- .../ByteArrayWriterProviderTest.java | 9 +---- .../JsonRecordWriterProviderTest.java | 9 +---- .../StringRecordWriterProviderTest.java | 10 ++--- 11 files changed, 50 insertions(+), 79 deletions(-) delete mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/CountingOutputStream.java diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/CountingOutputStream.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/CountingOutputStream.java deleted file mode 100644 index 295bfce..0000000 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/CountingOutputStream.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.microsoft.azure.kusto.kafka.connect.sink; - -import java.io.FilterOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -public class CountingOutputStream extends FilterOutputStream { - public long numBytes = 0; - public OutputStream outputStream; - - public CountingOutputStream(OutputStream out) { - super(out); - this.outputStream = out; - } - - @Override - public void write(int b) throws IOException { - out.write(b); - this.numBytes++; - } - - @Override - public void write(byte[] b) throws IOException { - out.write(b); - this.numBytes += b.length; - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - out.write(b, off, len); - this.numBytes += len; - } - - public OutputStream getOutputStream() { - return outputStream; - } -} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java index e952cde..d3a5e99 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java @@ -17,12 +17,7 @@ import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; -import java.io.File; -import java.io.FileDescriptor; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; +import java.io.*; import java.util.Map; import java.util.Timer; import java.util.TimerTask; @@ -288,5 +283,38 @@ public class FileWriter implements Closeable { throw new ConnectException(String.format("Invalid Kafka record format, connector does not support %s format. This connector supports Avro, Json with schema, Json without schema, Byte, String format. ",record.valueSchema().type())); } } + + private class CountingOutputStream extends FilterOutputStream { + private long numBytes = 0; + private OutputStream outputStream; + + public CountingOutputStream(OutputStream out) { + super(out); + this.outputStream = out; + } + + @Override + public void write(int b) throws IOException { + out.write(b); + this.numBytes++; + } + + @Override + public void write(byte[] b) throws IOException { + out.write(b); + this.numBytes += b.length; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + this.numBytes += len; + } + + public OutputStream getOutputStream() { + return outputStream; + } + } } + diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/format/RecordWriterProvider.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/format/RecordWriterProvider.java index 3efe06a..757314d 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/format/RecordWriterProvider.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/format/RecordWriterProvider.java @@ -1,9 +1,7 @@ package com.microsoft.azure.kusto.kafka.connect.sink.format; -import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream; - import java.io.OutputStream; public interface RecordWriterProvider { - RecordWriter getRecordWriter(String fileName, CountingOutputStream out); + RecordWriter getRecordWriter(String fileName, OutputStream out); } diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/AvroRecordWriterProvider.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/AvroRecordWriterProvider.java index 2b85da6..2805384 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/AvroRecordWriterProvider.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/AvroRecordWriterProvider.java @@ -1,6 +1,5 @@ package com.microsoft.azure.kusto.kafka.connect.sink.formatWriter; -import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream; import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter; import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriterProvider; import io.confluent.connect.avro.AvroData; @@ -22,7 +21,7 @@ public class AvroRecordWriterProvider implements RecordWriterProvider { private final AvroData avroData = new AvroData(50); @Override - public RecordWriter getRecordWriter(String filename, CountingOutputStream out) { + public RecordWriter getRecordWriter(String filename, OutputStream out) { return new RecordWriter() { final DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>()); Schema schema; diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/ByteRecordWriterProvider.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/ByteRecordWriterProvider.java index f1b038e..e26900e 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/ByteRecordWriterProvider.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/ByteRecordWriterProvider.java @@ -1,6 +1,5 @@ package com.microsoft.azure.kusto.kafka.connect.sink.formatWriter; -import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream; import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter; import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriterProvider; import org.apache.kafka.connect.errors.DataException; @@ -16,7 +15,7 @@ public class ByteRecordWriterProvider implements RecordWriterProvider { private static final Logger log = LoggerFactory.getLogger(ByteRecordWriterProvider.class); @Override - public RecordWriter getRecordWriter(String filename, CountingOutputStream out) { + public RecordWriter getRecordWriter(String filename, OutputStream out) { return new RecordWriter() { @Override diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/JsonRecordWriterProvider.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/JsonRecordWriterProvider.java index 557a782..5175a74 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/JsonRecordWriterProvider.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/JsonRecordWriterProvider.java @@ -2,7 +2,6 @@ package com.microsoft.azure.kusto.kafka.connect.sink.formatWriter; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; -import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream; import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter; import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriterProvider; import org.apache.kafka.connect.data.Struct; @@ -14,6 +13,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -23,8 +23,6 @@ public class JsonRecordWriterProvider implements RecordWriterProvider { private static final String LINE_SEPARATOR = System.lineSeparator(); private static final byte[] LINE_SEPARATOR_BYTES = LINE_SEPARATOR.getBytes(StandardCharsets.UTF_8); - private static final long LINE_SEPARATOR_BYTES_LENGTH - = LINE_SEPARATOR.getBytes(StandardCharsets.UTF_8).length; private final ObjectMapper mapper = new ObjectMapper(); private final JsonConverter converter = new JsonConverter(); @@ -37,7 +35,7 @@ public class JsonRecordWriterProvider implements RecordWriterProvider { } @Override - public RecordWriter getRecordWriter(final String filename, CountingOutputStream out) { + public RecordWriter getRecordWriter(final String filename, OutputStream out) { try { log.debug("Opening record writer for: {}", filename); return new RecordWriter() { diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/StringRecordWriterProvider.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/StringRecordWriterProvider.java index c2df16d..cacae4a 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/StringRecordWriterProvider.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/StringRecordWriterProvider.java @@ -1,6 +1,5 @@ package com.microsoft.azure.kusto.kafka.connect.sink.formatWriter; -import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream; import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter; import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriterProvider; import org.apache.kafka.connect.errors.DataException; @@ -9,12 +8,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; public class StringRecordWriterProvider implements RecordWriterProvider { private static final Logger log = LoggerFactory.getLogger(StringRecordWriterProvider.class); @Override - public RecordWriter getRecordWriter(String filename, CountingOutputStream out) { + public RecordWriter getRecordWriter(String filename, OutputStream out) { return new RecordWriter() { @Override diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/AvroRecordWriterTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/AvroRecordWriterTest.java index 562defe..1ec8d58 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/AvroRecordWriterTest.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/AvroRecordWriterTest.java @@ -1,6 +1,5 @@ package com.microsoft.azure.kusto.kafka.connect.sink.formatWriter; -import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream; import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter; import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericData; @@ -14,6 +13,7 @@ import org.junit.Test; import java.io.IOException; import java.io.File; import java.io.FileOutputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.List; @@ -38,7 +38,7 @@ public class AvroRecordWriterTest { File file = new File("abc.avro"); AvroRecordWriterProvider writer = new AvroRecordWriterProvider(); FileOutputStream fos = new FileOutputStream(file); - CountingOutputStream out = new CountingOutputStream(fos); + OutputStream out = fos; RecordWriter rd = writer.getRecordWriter(file.getPath(),out); for(SinkRecord record : records){ rd.write(record); diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/ByteArrayWriterProviderTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/ByteArrayWriterProviderTest.java index efeeddd..5226cf4 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/ByteArrayWriterProviderTest.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/ByteArrayWriterProviderTest.java @@ -1,16 +1,11 @@ package com.microsoft.azure.kusto.kafka.connect.sink.formatWriter; -import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream; import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.Test; -import java.io.File; -import java.io.IOException; -import java.io.FileOutputStream; -import java.io.BufferedReader; -import java.io.FileReader; +import java.io.*; import java.util.ArrayList; import java.util.List; @@ -27,7 +22,7 @@ public class ByteArrayWriterProviderTest { File file = new File("abc.bin"); ByteRecordWriterProvider writer = new ByteRecordWriterProvider(); FileOutputStream fos = new FileOutputStream(file); - CountingOutputStream out = new CountingOutputStream(fos); + OutputStream out = fos; RecordWriter rd = writer.getRecordWriter(file.getPath(), out); for(SinkRecord record : records){ rd.write(record); diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/JsonRecordWriterProviderTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/JsonRecordWriterProviderTest.java index 2f84133..36d290e 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/JsonRecordWriterProviderTest.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/JsonRecordWriterProviderTest.java @@ -1,15 +1,10 @@ package com.microsoft.azure.kusto.kafka.connect.sink.formatWriter; -import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream; import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.Test; -import java.io.File; -import java.io.IOException; -import java.io.FileOutputStream; -import java.io.BufferedReader; -import java.io.FileReader; +import java.io.*; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -30,7 +25,7 @@ public class JsonRecordWriterProviderTest { File file = new File("abc.json"); JsonRecordWriterProvider jsonWriter = new JsonRecordWriterProvider(); FileOutputStream fos = new FileOutputStream(file); - CountingOutputStream out = new CountingOutputStream(fos); + OutputStream out = fos; RecordWriter rd = jsonWriter.getRecordWriter(file.getPath(), out); for(SinkRecord record : records){ rd.write(record); diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/StringRecordWriterProviderTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/StringRecordWriterProviderTest.java index 61d8c7e..917fa2e 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/StringRecordWriterProviderTest.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/StringRecordWriterProviderTest.java @@ -1,15 +1,11 @@ package com.microsoft.azure.kusto.kafka.connect.sink.formatWriter; -import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream; import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.Test; -import java.io.File; -import java.io.IOException; -import java.io.FileOutputStream; -import java.io.BufferedReader; -import java.io.FileReader; + +import java.io.*; import java.util.ArrayList; import java.util.List; @@ -26,7 +22,7 @@ public class StringRecordWriterProviderTest { File file = new File("abc.txt"); StringRecordWriterProvider writer = new StringRecordWriterProvider(); FileOutputStream fos = new FileOutputStream(file); - CountingOutputStream out = new CountingOutputStream(fos); + OutputStream out = fos; RecordWriter rd = writer.getRecordWriter(file.getPath(), out); for(SinkRecord record : records){ rd.write(record);