This commit is contained in:
Родитель
8546f30679
Коммит
862c2ef373
|
@ -94,7 +94,7 @@ public class FileWriter implements Closeable {
|
|||
fos.getChannel().truncate(0);
|
||||
|
||||
countingStream = new CountingOutputStream(fos);
|
||||
outputStream = shouldCompressData ? new GZIPOutputStream(fos) : countingStream;
|
||||
outputStream = shouldCompressData ? new GZIPOutputStream(countingStream) : countingStream;
|
||||
fileDescriptor.file = file;
|
||||
currentFile = fileDescriptor;
|
||||
}
|
||||
|
@ -107,7 +107,7 @@ public class FileWriter implements Closeable {
|
|||
void finishFile() throws IOException {
|
||||
if(isDirty()){
|
||||
if(shouldCompressData){
|
||||
GZIPOutputStream gzip = (GZIPOutputStream )outputStream;
|
||||
GZIPOutputStream gzip = (GZIPOutputStream) outputStream;
|
||||
gzip.finish();
|
||||
} else {
|
||||
outputStream.flush();
|
||||
|
|
|
@ -58,18 +58,16 @@ public class TopicPartitionWriter {
|
|||
public String getFilePath() {
|
||||
long nextOffset = fileWriter != null && fileWriter.isDirty() ? currentOffset + 1 : currentOffset;
|
||||
|
||||
// Output files are always compressed
|
||||
String compressionExtension;
|
||||
if (this.eventDataCompression == null) {
|
||||
if(shouldCompressData(ingestionProps, null)){
|
||||
compressionExtension = ".gz";
|
||||
} else {
|
||||
compressionExtension = "";
|
||||
}
|
||||
} else {
|
||||
compressionExtension = "." + this.eventDataCompression.toString();
|
||||
}
|
||||
|
||||
String compressionExtension = "";
|
||||
|
||||
if (shouldCompressData(ingestionProps, null) || eventDataCompression != null) {
|
||||
if(eventDataCompression != null) {
|
||||
compressionExtension = "." + eventDataCompression.toString();
|
||||
} else {
|
||||
compressionExtension = ".gz";
|
||||
}
|
||||
}
|
||||
return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat(), compressionExtension)).toString();
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче