Merge pull request #14 from Azure/fixCompressedDataAndTests
Fix compressed data
This commit is contained in:
Коммит
e9a78c9d68
13
pom.xml
13
pom.xml
|
@ -32,6 +32,11 @@
|
|||
<target>8</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>2.21.0</version>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<resources>
|
||||
<resource>
|
||||
|
@ -113,7 +118,7 @@
|
|||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.4</version>
|
||||
<version>4.7</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
@ -133,6 +138,12 @@
|
|||
<version>RELEASE</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-all</artifactId>
|
||||
<version>1.9.5</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<repositories>
|
||||
<repository>
|
||||
|
|
|
@ -19,7 +19,7 @@ import java.util.zip.GZIPOutputStream;
|
|||
public class FileWriter implements Closeable {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
|
||||
public FileDescriptor currentFile;
|
||||
FileDescriptor currentFile;
|
||||
private Timer timer;
|
||||
private Consumer<FileDescriptor> onRollCallback;
|
||||
private final long flushInterval;
|
||||
|
@ -27,7 +27,7 @@ public class FileWriter implements Closeable {
|
|||
private Supplier<String> getFilePath;
|
||||
private OutputStream outputStream;
|
||||
private String basePath;
|
||||
private CountingOutputStream fileStream;
|
||||
private CountingOutputStream countingStream;
|
||||
private long fileThreshold;
|
||||
|
||||
/**
|
||||
|
@ -51,7 +51,7 @@ public class FileWriter implements Closeable {
|
|||
this.shouldCompressData = shouldCompressData;
|
||||
}
|
||||
|
||||
public boolean isDirty() {
|
||||
boolean isDirty() {
|
||||
return this.currentFile != null && this.currentFile.rawBytes > 0;
|
||||
}
|
||||
|
||||
|
@ -66,10 +66,10 @@ public class FileWriter implements Closeable {
|
|||
outputStream.write(data);
|
||||
|
||||
currentFile.rawBytes += data.length;
|
||||
currentFile.zippedBytes += fileStream.numBytes;
|
||||
currentFile.zippedBytes += countingStream.numBytes;
|
||||
currentFile.numRecords++;
|
||||
|
||||
if (this.flushInterval == 0 || (currentFile.rawBytes + data.length) > fileThreshold) {
|
||||
if (this.flushInterval == 0 || currentFile.rawBytes > fileThreshold) {
|
||||
rotate();
|
||||
resetFlushTimer(true);
|
||||
}
|
||||
|
@ -93,9 +93,8 @@ public class FileWriter implements Closeable {
|
|||
FileOutputStream fos = new FileOutputStream(file);
|
||||
fos.getChannel().truncate(0);
|
||||
|
||||
fileStream = new CountingOutputStream(fos);
|
||||
outputStream = shouldCompressData ? new GZIPOutputStream(fileStream) : fileStream;
|
||||
|
||||
countingStream = new CountingOutputStream(fos);
|
||||
outputStream = shouldCompressData ? new GZIPOutputStream(countingStream) : countingStream;
|
||||
fileDescriptor.file = file;
|
||||
currentFile = fileDescriptor;
|
||||
}
|
||||
|
@ -105,13 +104,20 @@ public class FileWriter implements Closeable {
|
|||
openFile();
|
||||
}
|
||||
|
||||
private void finishFile() throws IOException {
|
||||
void finishFile() throws IOException {
|
||||
if(isDirty()){
|
||||
outputStream.close();
|
||||
if(shouldCompressData){
|
||||
GZIPOutputStream gzip = (GZIPOutputStream) outputStream;
|
||||
gzip.finish();
|
||||
} else {
|
||||
outputStream.flush();
|
||||
}
|
||||
|
||||
onRollCallback.accept(currentFile);
|
||||
}
|
||||
|
||||
currentFile.file.delete();
|
||||
// closing late so that the success callback will have a chance to use the file. This is a real thing on debug?!
|
||||
outputStream.close();
|
||||
}
|
||||
|
||||
public void rollback() throws IOException {
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package com.microsoft.azure.kusto.kafka.connect.sink;
|
||||
|
||||
import com.microsoft.azure.kusto.data.ConnectionStringBuilder;
|
||||
import com.microsoft.azure.kusto.ingest.IngestClient;
|
||||
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
|
||||
import com.microsoft.azure.kusto.ingest.IngestionProperties;
|
||||
import com.microsoft.azure.kusto.ingest.source.CompressionType;
|
||||
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
|
||||
|
@ -45,6 +47,7 @@ public class TopicPartitionWriter {
|
|||
|
||||
try {
|
||||
client.ingestFromFile(fileSourceInfo, ingestionProps);
|
||||
|
||||
log.info(String.format("Kusto ingestion: file (%s) of size (%s) at current offset (%s)", fileDescriptor.path, fileDescriptor.rawBytes, currentOffset));
|
||||
this.lastCommittedOffset = currentOffset;
|
||||
} catch (Exception e) {
|
||||
|
@ -55,9 +58,16 @@ public class TopicPartitionWriter {
|
|||
public String getFilePath() {
|
||||
long nextOffset = fileWriter != null && fileWriter.isDirty() ? currentOffset + 1 : currentOffset;
|
||||
|
||||
// Output files are always compressed
|
||||
String compressionExtension = this.eventDataCompression == null ? "gz" : this.eventDataCompression.toString();
|
||||
return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s.%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat(), compressionExtension)).toString();
|
||||
String compressionExtension = "";
|
||||
if (shouldCompressData(ingestionProps, null) || eventDataCompression != null) {
|
||||
if(eventDataCompression != null) {
|
||||
compressionExtension = "." + eventDataCompression.toString();
|
||||
} else {
|
||||
compressionExtension = ".gz";
|
||||
}
|
||||
}
|
||||
|
||||
return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat(), compressionExtension)).toString();
|
||||
}
|
||||
|
||||
public void writeRecord(SinkRecord record) {
|
||||
|
@ -83,9 +93,8 @@ public class TopicPartitionWriter {
|
|||
this.currentOffset = record.kafkaOffset();
|
||||
} else {
|
||||
try {
|
||||
fileWriter.write(value);
|
||||
|
||||
this.currentOffset = record.kafkaOffset();
|
||||
fileWriter.write(value);
|
||||
} catch (IOException e) {
|
||||
log.error("File write failed", e);
|
||||
}
|
||||
|
@ -93,18 +102,16 @@ public class TopicPartitionWriter {
|
|||
}
|
||||
|
||||
public void open() {
|
||||
boolean flushImmediately = ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.avro.toString())
|
||||
|| ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.parquet.toString())
|
||||
|| ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.orc.toString())
|
||||
|| this.eventDataCompression != null;
|
||||
// Should compress binary files
|
||||
boolean shouldCompressData = shouldCompressData(this.ingestionProps, this.eventDataCompression);
|
||||
|
||||
fileWriter = new FileWriter(
|
||||
basePath,
|
||||
fileThreshold,
|
||||
this::handleRollFile,
|
||||
this::getFilePath,
|
||||
flushImmediately ? 0 : flushInterval,
|
||||
this.eventDataCompression == null);
|
||||
!shouldCompressData ? 0 : flushInterval,
|
||||
shouldCompressData);
|
||||
}
|
||||
|
||||
public void close() {
|
||||
|
@ -115,4 +122,11 @@ public class TopicPartitionWriter {
|
|||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
static boolean shouldCompressData(IngestionProperties ingestionProps, CompressionType eventDataCompression) {
|
||||
return !(ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.avro.toString())
|
||||
|| ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.parquet.toString())
|
||||
|| ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.orc.toString())
|
||||
|| eventDataCompression != null);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,7 +62,7 @@ public class FileWriterTest {
|
|||
|
||||
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 1);
|
||||
Assert.assertEquals(fileWriter.currentFile.rawBytes, 0);
|
||||
Assert.assertEquals(fileWriter.currentFile.path, FILE_PATH + ".gz");
|
||||
Assert.assertEquals(fileWriter.currentFile.path, FILE_PATH);
|
||||
Assert.assertTrue(fileWriter.currentFile.file.canWrite());
|
||||
|
||||
fileWriter.rollback();
|
||||
|
@ -80,7 +80,7 @@ public class FileWriterTest {
|
|||
|
||||
HashMap<String, Long> files = new HashMap<>();
|
||||
|
||||
final int MAX_FILE_SIZE = 128;
|
||||
final int MAX_FILE_SIZE = 100;
|
||||
|
||||
Consumer<FileDescriptor> trackFiles = (FileDescriptor f) -> files.put(f.path, f.rawBytes);
|
||||
|
||||
|
@ -177,7 +177,23 @@ public class FileWriterTest {
|
|||
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
|
||||
String msg = "Message";
|
||||
|
||||
Consumer<FileDescriptor> trackFiles = (FileDescriptor f) -> {
|
||||
Consumer<FileDescriptor> trackFiles = getAssertFileConsumer(msg);
|
||||
|
||||
Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";
|
||||
|
||||
// Expect no files to be ingested as size is small and flushInterval is big
|
||||
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, false);
|
||||
|
||||
gzipOutputStream.write(msg.getBytes());
|
||||
gzipOutputStream.finish();
|
||||
fileWriter.write(byteArrayOutputStream.toByteArray());
|
||||
|
||||
fileWriter.close();
|
||||
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
|
||||
}
|
||||
|
||||
static Consumer<FileDescriptor> getAssertFileConsumer(String msg) {
|
||||
return (FileDescriptor f) -> {
|
||||
try (FileInputStream fileInputStream = new FileInputStream(f.file)) {
|
||||
byte[] bytes = IOUtils.toByteArray(fileInputStream);
|
||||
try (ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
|
||||
|
@ -202,19 +218,5 @@ public class FileWriterTest {
|
|||
Assert.fail(e.getMessage());
|
||||
}
|
||||
};
|
||||
|
||||
Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";
|
||||
|
||||
// Expect no files to be ingested as size is small and flushInterval is big
|
||||
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, true);
|
||||
|
||||
gzipOutputStream.write(msg.getBytes());
|
||||
gzipOutputStream.finish();
|
||||
fileWriter.write(byteArrayOutputStream.toByteArray());
|
||||
|
||||
fileWriter.close();
|
||||
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,9 +2,11 @@ package com.microsoft.azure.kusto.kafka.connect.sink;
|
|||
|
||||
import com.microsoft.azure.kusto.ingest.IngestClient;
|
||||
import com.microsoft.azure.kusto.ingest.IngestionProperties;
|
||||
import com.microsoft.azure.kusto.ingest.source.CompressionType;
|
||||
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.sink.SinkRecord;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -12,27 +14,31 @@ import org.junit.Test;
|
|||
import org.mockito.ArgumentCaptor;
|
||||
import org.testng.Assert;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Paths;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class TopicPartitionWriterTest {
|
||||
// TODO: should probably find a better way to mock internal class (FileWriter)...
|
||||
File currentDirectory;
|
||||
private File currentDirectory;
|
||||
|
||||
@Before
|
||||
public final void before() {
|
||||
currentDirectory = new File(Paths.get(
|
||||
System.getProperty("java.io.tmpdir"),
|
||||
FileWriter.class.getSimpleName(),
|
||||
String.valueOf(Instant.now().toEpochMilli())
|
||||
).toString());
|
||||
// currentDirectory = new File(Paths.get(
|
||||
// System.getProperty("java.io.tmpdir"),
|
||||
// FileWriter.class.getSimpleName(),
|
||||
// String.valueOf(Instant.now().toEpochMilli())
|
||||
// ).toString());
|
||||
currentDirectory = new File("C:\\Users\\ohbitton\\Desktop");
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -89,10 +95,12 @@ public class TopicPartitionWriterTest {
|
|||
long fileThreshold = 100;
|
||||
long flushInterval = 300000;
|
||||
TopicIngestionProperties props = new TopicIngestionProperties();
|
||||
|
||||
props.ingestionProperties = new IngestionProperties(db, table);
|
||||
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
|
||||
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
|
||||
|
||||
Assert.assertEquals(writer.getFilePath(), Paths.get(basePath, "kafka_testTopic_11_0").toString());
|
||||
Assert.assertEquals(writer.getFilePath(), Paths.get(basePath, "kafka_testTopic_11_0.csv.gz").toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -106,6 +114,7 @@ public class TopicPartitionWriterTest {
|
|||
long flushInterval = 300000;
|
||||
TopicIngestionProperties props = new TopicIngestionProperties();
|
||||
props.ingestionProperties = new IngestionProperties(db, table);
|
||||
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
|
||||
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
|
||||
writer.open();
|
||||
List<SinkRecord> records = new ArrayList<>();
|
||||
|
@ -117,7 +126,7 @@ public class TopicPartitionWriterTest {
|
|||
writer.writeRecord(record);
|
||||
}
|
||||
|
||||
Assert.assertEquals(writer.getFilePath(), Paths.get(basePath, "kafka_testTopic_11_5").toString());
|
||||
Assert.assertEquals(writer.getFilePath(), Paths.get(basePath, "kafka_testTopic_11_5.csv.gz").toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -131,6 +140,7 @@ public class TopicPartitionWriterTest {
|
|||
long flushInterval = 300000;
|
||||
TopicIngestionProperties props = new TopicIngestionProperties();
|
||||
props.ingestionProperties = new IngestionProperties(db, table);
|
||||
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
|
||||
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
|
||||
writer.open();
|
||||
writer.close();
|
||||
|
@ -162,63 +172,111 @@ public class TopicPartitionWriterTest {
|
|||
// Assert.assertEquals(writer.getFilePath(), "kafka_testPartition_11_0");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteStringyValuesAndOffset() throws Exception {
|
||||
TopicPartition tp = new TopicPartition("testTopic", 2);
|
||||
IngestClient mockClient = mock(IngestClient.class);
|
||||
String db = "testdb1";
|
||||
String table = "testtable1";
|
||||
String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
|
||||
long fileThreshold = 100;
|
||||
long flushInterval = 300000;
|
||||
TopicIngestionProperties props = new TopicIngestionProperties();
|
||||
props.ingestionProperties = new IngestionProperties(db, table);
|
||||
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
|
||||
|
||||
|
||||
writer.open();
|
||||
List<SinkRecord> records = new ArrayList<SinkRecord>();
|
||||
|
||||
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "another,stringy,message", 3));
|
||||
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}", 4));
|
||||
|
||||
for (SinkRecord record : records) {
|
||||
writer.writeRecord(record);
|
||||
}
|
||||
|
||||
Assert.assertEquals(writer.fileWriter.currentFile.path, Paths.get(basePath, String.format("kafka_%s_%d_%d.gz", tp.topic(), tp.partition(), 0)).toString());
|
||||
}
|
||||
// @Test
|
||||
// public void testWriteStringyValuesAndOffset() throws Exception {
|
||||
// TopicPartition tp = new TopicPartition("testTopic", 2);
|
||||
// IngestClient mockClient = mock(IngestClient.class);
|
||||
// String db = "testdb1";
|
||||
// String table = "testtable1";
|
||||
// String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
|
||||
// long fileThreshold = 100;
|
||||
// long flushInterval = 300000;
|
||||
// TopicIngestionProperties props = new TopicIngestionProperties();
|
||||
//
|
||||
// props.ingestionProperties = new IngestionProperties(db, table);
|
||||
// props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
|
||||
// TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
|
||||
//
|
||||
//
|
||||
// writer.open();
|
||||
// List<SinkRecord> records = new ArrayList<SinkRecord>();
|
||||
//
|
||||
// records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "another,stringy,message", 3));
|
||||
// records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}", 4));
|
||||
//
|
||||
// for (SinkRecord record : records) {
|
||||
// writer.writeRecord(record);
|
||||
// }
|
||||
//
|
||||
// Assert.assertEquals(writer.fileWriter.currentFile.path, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 3, IngestionProperties.DATA_FORMAT.csv.name())).toString());
|
||||
// }
|
||||
|
||||
@Test
|
||||
public void testWriteBytesValuesAndOffset() throws Exception {
|
||||
public void testWriteStringValuesAndOffset() throws IOException {
|
||||
TopicPartition tp = new TopicPartition("testPartition", 11);
|
||||
IngestClient mockClient = mock(IngestClient.class);
|
||||
String db = "testdb1";
|
||||
String table = "testtable1";
|
||||
String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
|
||||
long fileThreshold = 50;
|
||||
String[] messages = new String[]{ "stringy message", "another,stringy,message", "{'also':'stringy','sortof':'message'}"};
|
||||
|
||||
// Expect to finish file after writing forth message cause of fileThreshold
|
||||
long fileThreshold = messages[0].length() + messages[1].length() + messages[2].length() + messages[2].length() - 1;
|
||||
long flushInterval = 300000;
|
||||
TopicIngestionProperties props = new TopicIngestionProperties();
|
||||
props.ingestionProperties = new IngestionProperties(db, table);
|
||||
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
|
||||
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
|
||||
|
||||
writer.open();
|
||||
List<SinkRecord> records = new ArrayList<SinkRecord>();
|
||||
|
||||
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "stringy message".getBytes(StandardCharsets.UTF_8), 10));
|
||||
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "another,stringy,message".getBytes(StandardCharsets.UTF_8), 13));
|
||||
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}".getBytes(StandardCharsets.UTF_8), 14));
|
||||
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}".getBytes(StandardCharsets.UTF_8), 15));
|
||||
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}".getBytes(StandardCharsets.UTF_8), 16));
|
||||
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}".getBytes(StandardCharsets.UTF_8), 17));
|
||||
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, messages[0], 10));
|
||||
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, messages[1], 13));
|
||||
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, messages[2], 14));
|
||||
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, messages[2], 15));
|
||||
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, messages[2], 16));
|
||||
|
||||
for (SinkRecord record : records) {
|
||||
writer.writeRecord(record);
|
||||
}
|
||||
|
||||
//TODO : file threshold ignored?
|
||||
Assert.assertTrue(writer.lastCommittedOffset.equals((long) 15));
|
||||
Assert.assertEquals(writer.currentOffset, 17);
|
||||
Assert.assertEquals(writer.fileWriter.currentFile.path, Paths.get(basePath, String.format("kafka_%s_%d_%d.gz", tp.topic(), tp.partition(), 16)).toString());
|
||||
Assert.assertEquals((long) writer.lastCommittedOffset, (long) 15);
|
||||
Assert.assertEquals(writer.currentOffset, 16);
|
||||
|
||||
String currentFileName = writer.fileWriter.currentFile.path;
|
||||
Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 16, IngestionProperties.DATA_FORMAT.csv.name())).toString());
|
||||
|
||||
// Read
|
||||
writer.fileWriter.finishFile();
|
||||
Consumer<FileDescriptor> assertFileConsumer = FileWriterTest.getAssertFileConsumer(messages[2] + "\n");
|
||||
assertFileConsumer.accept(writer.fileWriter.currentFile);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteBytesValuesAndOffset() throws IOException {
|
||||
TopicPartition tp = new TopicPartition("testPartition", 11);
|
||||
IngestClient mockClient = mock(IngestClient.class);
|
||||
|
||||
String db = "testdb1";
|
||||
String table = "testtable1";
|
||||
String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
|
||||
FileInputStream fis = new FileInputStream("C:\\Users\\ohbitton\\source\\Workspaces\\Kusto\\Main\\Test\\UT\\Kusto.Engine.UT\\Common\\Kusto.Common.Svc\\Stream\\dataset3.avro");
|
||||
ByteArrayOutputStream o = new ByteArrayOutputStream();
|
||||
int content;
|
||||
while ((content = fis.read()) != -1) {
|
||||
// convert to char and display it
|
||||
o.write(content);
|
||||
}
|
||||
// Expect to finish file with one record although fileThreshold is high
|
||||
long fileThreshold = 128;
|
||||
long flushInterval = 300000;
|
||||
TopicIngestionProperties props = new TopicIngestionProperties();
|
||||
props.ingestionProperties = new IngestionProperties(db, table);
|
||||
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.avro);
|
||||
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
|
||||
|
||||
writer.open();
|
||||
List<SinkRecord> records = new ArrayList<SinkRecord>();
|
||||
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.BYTES_SCHEMA, o.toByteArray(), 10));
|
||||
|
||||
for (SinkRecord record : records) {
|
||||
writer.writeRecord(record);
|
||||
}
|
||||
|
||||
Assert.assertEquals((long) writer.lastCommittedOffset, (long) 10);
|
||||
Assert.assertEquals(writer.currentOffset, 10);
|
||||
|
||||
String currentFileName = writer.fileWriter.currentFile.path;
|
||||
Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 11, IngestionProperties.DATA_FORMAT.csv.name())).toString());
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче