diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java index c38fd64..a59401e 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java @@ -58,6 +58,7 @@ public class FileWriter implements Closeable { private RecordWriter recordWriter; private final IngestionProperties ingestionProps; private BehaviorOnError behaviorOnError; + private boolean shouldWriteAvroAsBytes = false; /** * @param basePath - This is path to which to write the files to. @@ -256,17 +257,12 @@ public class FileWriter implements Closeable { currentFile.rawBytes = recordWriter.getDataSize(); currentFile.zippedBytes += countingStream.numBytes; currentFile.numRecords++; - if (this.flushInterval == 0 || currentFile.rawBytes > fileThreshold || shouldWriteAvroAsBytes(record)) { + if (this.flushInterval == 0 || currentFile.rawBytes > fileThreshold || shouldWriteAvroAsBytes) { rotate(record.kafkaOffset()); resetFlushTimer(true); } } - private boolean shouldWriteAvroAsBytes(SinkRecord record) { - return (record.valueSchema().type() == Schema.Type.BYTES) && - (ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.avro.toString())); - } - public void initializeRecordWriter(SinkRecord record) { if (record.value() instanceof Map) { recordWriterProvider = new JsonRecordWriterProvider(); @@ -287,6 +283,9 @@ public class FileWriter implements Closeable { } else if ((record.valueSchema() != null) && (record.valueSchema().type() == Schema.Type.BYTES)){ recordWriterProvider = new ByteRecordWriterProvider(); + if(ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.avro.toString())) { + shouldWriteAvroAsBytes = true; + } } else { throw new ConnectException(String.format("Invalid Kafka record format, connector does not support %s format. This connector supports Avro, Json with schema, Json without schema, Byte, String format. ",record.valueSchema().type())); }