зеркало из
1
0
Форкнуть 0

Refactor flush logic for avro as byte

This commit is contained in:
hasher 2020-06-30 12:19:46 +05:30
Родитель 9e5f495748
Коммит 79b4e0a89d
1 изменённых файлов: 5 добавлений и 6 удалений

Просмотреть файл

@ -58,6 +58,7 @@ public class FileWriter implements Closeable {
private RecordWriter recordWriter;
private final IngestionProperties ingestionProps;
private BehaviorOnError behaviorOnError;
private boolean shouldWriteAvroAsBytes = false;
/**
* @param basePath - This is path to which to write the files to.
@ -256,17 +257,12 @@ public class FileWriter implements Closeable {
currentFile.rawBytes = recordWriter.getDataSize();
currentFile.zippedBytes += countingStream.numBytes;
currentFile.numRecords++;
if (this.flushInterval == 0 || currentFile.rawBytes > fileThreshold || shouldWriteAvroAsBytes(record)) {
if (this.flushInterval == 0 || currentFile.rawBytes > fileThreshold || shouldWriteAvroAsBytes) {
rotate(record.kafkaOffset());
resetFlushTimer(true);
}
}
private boolean shouldWriteAvroAsBytes(SinkRecord record) {
return (record.valueSchema().type() == Schema.Type.BYTES) &&
(ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.avro.toString()));
}
public void initializeRecordWriter(SinkRecord record) {
if (record.value() instanceof Map) {
recordWriterProvider = new JsonRecordWriterProvider();
@ -287,6 +283,9 @@ public class FileWriter implements Closeable {
}
else if ((record.valueSchema() != null) && (record.valueSchema().type() == Schema.Type.BYTES)){
recordWriterProvider = new ByteRecordWriterProvider();
if(ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.avro.toString())) {
shouldWriteAvroAsBytes = true;
}
} else {
throw new ConnectException(String.format("Invalid Kafka record format, connector does not support %s format. This connector supports Avro, Json with schema, Json without schema, Byte, String format. ",record.valueSchema().type()));
}