зеркало из
1
0
Форкнуть 0

add flush check for avro as byte

This commit is contained in:
hasher 2020-06-29 20:35:50 +05:30
Родитель 354b4514c5
Коммит 64578e2226
1 изменённых файлов: 6 добавлений и 1 удалений

Просмотреть файл

@ -256,12 +256,17 @@ public class FileWriter implements Closeable {
currentFile.rawBytes = recordWriter.getDataSize();
currentFile.zippedBytes += countingStream.numBytes;
currentFile.numRecords++;
if (this.flushInterval == 0 || currentFile.rawBytes > fileThreshold) {
if (this.flushInterval == 0 || currentFile.rawBytes > fileThreshold || shouldWriteAvroAsBytes(record)) {
rotate(record.kafkaOffset());
resetFlushTimer(true);
}
}
private boolean shouldWriteAvroAsBytes(SinkRecord record) {
return ((record.valueSchema().type() == Schema.Type.BYTES) &&
(ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.avro.toString())));
}
public void initializeRecordWriter(SinkRecord record) {
if (record.value() instanceof Map) {
recordWriterProvider = new JsonRecordWriterProvider();