Refactored countingOutputStream

This commit is contained in:
hasher 2020-07-15 16:47:51 +05:30
Родитель 4bdaa588ca
Коммит 715303d318
11 изменённых файлов: 50 добавлений и 79 удалений

Просмотреть файл

@ -1,37 +0,0 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
public class CountingOutputStream extends FilterOutputStream {
public long numBytes = 0;
public OutputStream outputStream;
public CountingOutputStream(OutputStream out) {
super(out);
this.outputStream = out;
}
@Override
public void write(int b) throws IOException {
out.write(b);
this.numBytes++;
}
@Override
public void write(byte[] b) throws IOException {
out.write(b);
this.numBytes += b.length;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
this.numBytes += len;
}
public OutputStream getOutputStream() {
return outputStream;
}
}

Просмотреть файл

@ -17,12 +17,7 @@ import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.*;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
@ -288,5 +283,38 @@ public class FileWriter implements Closeable {
throw new ConnectException(String.format("Invalid Kafka record format, connector does not support %s format. This connector supports Avro, Json with schema, Json without schema, Byte, String format. ",record.valueSchema().type()));
}
}
private class CountingOutputStream extends FilterOutputStream {
private long numBytes = 0;
private OutputStream outputStream;
public CountingOutputStream(OutputStream out) {
super(out);
this.outputStream = out;
}
@Override
public void write(int b) throws IOException {
out.write(b);
this.numBytes++;
}
@Override
public void write(byte[] b) throws IOException {
out.write(b);
this.numBytes += b.length;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
this.numBytes += len;
}
public OutputStream getOutputStream() {
return outputStream;
}
}
}

Просмотреть файл

@ -1,9 +1,7 @@
package com.microsoft.azure.kusto.kafka.connect.sink.format;
import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream;
import java.io.OutputStream;
public interface RecordWriterProvider {
RecordWriter getRecordWriter(String fileName, CountingOutputStream out);
RecordWriter getRecordWriter(String fileName, OutputStream out);
}

Просмотреть файл

@ -1,6 +1,5 @@
package com.microsoft.azure.kusto.kafka.connect.sink.formatWriter;
import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriterProvider;
import io.confluent.connect.avro.AvroData;
@ -22,7 +21,7 @@ public class AvroRecordWriterProvider implements RecordWriterProvider {
private final AvroData avroData = new AvroData(50);
@Override
public RecordWriter getRecordWriter(String filename, CountingOutputStream out) {
public RecordWriter getRecordWriter(String filename, OutputStream out) {
return new RecordWriter() {
final DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>());
Schema schema;

Просмотреть файл

@ -1,6 +1,5 @@
package com.microsoft.azure.kusto.kafka.connect.sink.formatWriter;
import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriterProvider;
import org.apache.kafka.connect.errors.DataException;
@ -16,7 +15,7 @@ public class ByteRecordWriterProvider implements RecordWriterProvider {
private static final Logger log = LoggerFactory.getLogger(ByteRecordWriterProvider.class);
@Override
public RecordWriter getRecordWriter(String filename, CountingOutputStream out) {
public RecordWriter getRecordWriter(String filename, OutputStream out) {
return new RecordWriter() {
@Override

Просмотреть файл

@ -2,7 +2,6 @@ package com.microsoft.azure.kusto.kafka.connect.sink.formatWriter;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriterProvider;
import org.apache.kafka.connect.data.Struct;
@ -14,6 +13,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@ -23,8 +23,6 @@ public class JsonRecordWriterProvider implements RecordWriterProvider {
private static final String LINE_SEPARATOR = System.lineSeparator();
private static final byte[] LINE_SEPARATOR_BYTES
= LINE_SEPARATOR.getBytes(StandardCharsets.UTF_8);
private static final long LINE_SEPARATOR_BYTES_LENGTH
= LINE_SEPARATOR.getBytes(StandardCharsets.UTF_8).length;
private final ObjectMapper mapper = new ObjectMapper();
private final JsonConverter converter = new JsonConverter();
@ -37,7 +35,7 @@ public class JsonRecordWriterProvider implements RecordWriterProvider {
}
@Override
public RecordWriter getRecordWriter(final String filename, CountingOutputStream out) {
public RecordWriter getRecordWriter(final String filename, OutputStream out) {
try {
log.debug("Opening record writer for: {}", filename);
return new RecordWriter() {

Просмотреть файл

@ -1,6 +1,5 @@
package com.microsoft.azure.kusto.kafka.connect.sink.formatWriter;
import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriterProvider;
import org.apache.kafka.connect.errors.DataException;
@ -9,12 +8,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
public class StringRecordWriterProvider implements RecordWriterProvider {
private static final Logger log = LoggerFactory.getLogger(StringRecordWriterProvider.class);
@Override
public RecordWriter getRecordWriter(String filename, CountingOutputStream out) {
public RecordWriter getRecordWriter(String filename, OutputStream out) {
return new RecordWriter() {
@Override

Просмотреть файл

@ -1,6 +1,5 @@
package com.microsoft.azure.kusto.kafka.connect.sink.formatWriter;
import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericData;
@ -14,6 +13,7 @@ import org.junit.Test;
import java.io.IOException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
@ -38,7 +38,7 @@ public class AvroRecordWriterTest {
File file = new File("abc.avro");
AvroRecordWriterProvider writer = new AvroRecordWriterProvider();
FileOutputStream fos = new FileOutputStream(file);
CountingOutputStream out = new CountingOutputStream(fos);
OutputStream out = fos;
RecordWriter rd = writer.getRecordWriter(file.getPath(),out);
for(SinkRecord record : records){
rd.write(record);

Просмотреть файл

@ -1,16 +1,11 @@
package com.microsoft.azure.kusto.kafka.connect.sink.formatWriter;
import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.FileOutputStream;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
@ -27,7 +22,7 @@ public class ByteArrayWriterProviderTest {
File file = new File("abc.bin");
ByteRecordWriterProvider writer = new ByteRecordWriterProvider();
FileOutputStream fos = new FileOutputStream(file);
CountingOutputStream out = new CountingOutputStream(fos);
OutputStream out = fos;
RecordWriter rd = writer.getRecordWriter(file.getPath(), out);
for(SinkRecord record : records){
rd.write(record);

Просмотреть файл

@ -1,15 +1,10 @@
package com.microsoft.azure.kusto.kafka.connect.sink.formatWriter;
import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.FileOutputStream;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -30,7 +25,7 @@ public class JsonRecordWriterProviderTest {
File file = new File("abc.json");
JsonRecordWriterProvider jsonWriter = new JsonRecordWriterProvider();
FileOutputStream fos = new FileOutputStream(file);
CountingOutputStream out = new CountingOutputStream(fos);
OutputStream out = fos;
RecordWriter rd = jsonWriter.getRecordWriter(file.getPath(), out);
for(SinkRecord record : records){
rd.write(record);

Просмотреть файл

@ -1,15 +1,11 @@
package com.microsoft.azure.kusto.kafka.connect.sink.formatWriter;
import com.microsoft.azure.kusto.kafka.connect.sink.CountingOutputStream;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.FileOutputStream;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
@ -26,7 +22,7 @@ public class StringRecordWriterProviderTest {
File file = new File("abc.txt");
StringRecordWriterProvider writer = new StringRecordWriterProvider();
FileOutputStream fos = new FileOutputStream(file);
CountingOutputStream out = new CountingOutputStream(fos);
OutputStream out = fos;
RecordWriter rd = writer.getRecordWriter(file.getPath(), out);
for(SinkRecord record : records){
rd.write(record);