fix and test
This commit is contained in:
Родитель
665e186028
Коммит
e3801df32c
|
@ -37,7 +37,7 @@ public class FileWriter implements Closeable {
|
|||
* @param getFilePath - Allow external resolving of file name.
|
||||
* @param shouldCompressData - Should the FileWriter compress the incoming data
|
||||
*/
|
||||
public FileWriter(String basePath,
|
||||
public FileWriter(String basePath,
|
||||
long fileThreshold,
|
||||
Consumer<FileDescriptor> onRollCallback,
|
||||
Supplier<String> getFilePath,
|
||||
|
@ -101,17 +101,18 @@ public class FileWriter implements Closeable {
|
|||
}
|
||||
|
||||
void rotate() throws IOException {
|
||||
finishFile();
|
||||
finishFile(true);
|
||||
openFile();
|
||||
}
|
||||
|
||||
private void finishFile() throws IOException {
|
||||
void finishFile(boolean delete) throws IOException {
|
||||
outputStream.close();
|
||||
if (isDirty()) {
|
||||
onRollCallback.accept(currentFile);
|
||||
}
|
||||
|
||||
currentFile.file.delete();
|
||||
if(delete){
|
||||
currentFile.file.delete();
|
||||
}
|
||||
}
|
||||
|
||||
public void rollback() throws IOException {
|
||||
|
@ -130,7 +131,7 @@ public class FileWriter implements Closeable {
|
|||
}
|
||||
|
||||
// Flush last file, updating index
|
||||
finishFile();
|
||||
finishFile(true);
|
||||
|
||||
// Setting to null so subsequent calls to close won't write it again
|
||||
currentFile = null;
|
||||
|
|
|
@ -177,11 +177,27 @@ public class FileWriterTest {
|
|||
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
|
||||
String msg = "Message";
|
||||
|
||||
Consumer<FileDescriptor> trackFiles = (FileDescriptor f) -> {
|
||||
try (FileInputStream fileInputStream = new FileInputStream(f.file)){
|
||||
Consumer<FileDescriptor> trackFiles = getAssertFileConsomer(msg);
|
||||
|
||||
Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";
|
||||
|
||||
// Expect no files to be ingested as size is small and flushInterval is big
|
||||
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, false);
|
||||
|
||||
gzipOutputStream.write(msg.getBytes());
|
||||
gzipOutputStream.finish();
|
||||
fileWriter.write(byteArrayOutputStream.toByteArray());
|
||||
|
||||
fileWriter.close();
|
||||
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
|
||||
}
|
||||
|
||||
static Consumer<FileDescriptor> getAssertFileConsomer(String msg) {
|
||||
return (FileDescriptor f) -> {
|
||||
try (FileInputStream fileInputStream = new FileInputStream(f.file)) {
|
||||
byte[] bytes = IOUtils.toByteArray(fileInputStream);
|
||||
try (ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
|
||||
GZIPInputStream gzipper = new GZIPInputStream(bin)){
|
||||
GZIPInputStream gzipper = new GZIPInputStream(bin)) {
|
||||
|
||||
byte[] buffer = new byte[1024];
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
|
@ -202,19 +218,5 @@ public class FileWriterTest {
|
|||
Assert.fail(e.getMessage());
|
||||
}
|
||||
};
|
||||
|
||||
Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";
|
||||
|
||||
// Expect no files to be ingested as size is small and flushInterval is big
|
||||
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, false);
|
||||
|
||||
gzipOutputStream.write(msg.getBytes());
|
||||
gzipOutputStream.finish();
|
||||
fileWriter.write(byteArrayOutputStream.toByteArray());
|
||||
|
||||
fileWriter.close();
|
||||
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package com.microsoft.azure.kusto.kafka.connect.sink;
|
||||
|
||||
import com.microsoft.azure.kusto.data.ConnectionStringBuilder;
|
||||
import com.microsoft.azure.kusto.ingest.IngestClient;
|
||||
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
|
||||
import com.microsoft.azure.kusto.ingest.IngestionProperties;
|
||||
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
@ -14,11 +16,12 @@ import org.testng.Assert;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Paths;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
|
@ -28,11 +31,12 @@ public class TopicPartitionWriterTest {
|
|||
|
||||
@Before
|
||||
public final void before() {
|
||||
currentDirectory = new File(Paths.get(
|
||||
System.getProperty("java.io.tmpdir"),
|
||||
FileWriter.class.getSimpleName(),
|
||||
String.valueOf(Instant.now().toEpochMilli())
|
||||
).toString());
|
||||
currentDirectory = new File("C:\\Users\\ohbitton\\Desktop\\clients - backup");
|
||||
// currentDirectory = new File(Paths.get(
|
||||
// System.getProperty("java.io.tmpdir"),
|
||||
// FileWriter.class.getSimpleName(),
|
||||
// String.valueOf(Instant.now().toEpochMilli())
|
||||
// ).toString());
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -196,20 +200,27 @@ public class TopicPartitionWriterTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testWriteBytesValuesAndOffset() {
|
||||
public void testWriteBytesValuesAndOffset() throws IOException, URISyntaxException {
|
||||
TopicPartition tp = new TopicPartition("testPartition", 11);
|
||||
IngestClient mockClient = mock(IngestClient.class);
|
||||
String db = "testdb1";
|
||||
String table = "testtable1";
|
||||
String ClientID ="d5e0a24c-3a09-40ce-a1d6-dc5ab58dae66";
|
||||
String pass = "L+0hoM34kqC22XRniWOgkETwVvawiir2odEjYqZeyXA=";
|
||||
String auth = "72f988bf-86f1-41af-91ab-2d7cd011db47";
|
||||
// IngestClient client = IngestClientFactory.createClient(ConnectionStringBuilder.createWithDeviceCodeCredentials("https://ingest-ohbitton.kusto.windows.net"));
|
||||
ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials("https://ingest-ohbitton.dev.kusto.windows.net", ClientID, pass, auth);
|
||||
IngestClient client = IngestClientFactory.createClient(csb);
|
||||
|
||||
IngestionProperties ingestionProperties = new IngestionProperties("ohtst","TestTable2");
|
||||
String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
|
||||
String[] messages = new String[]{ "stringy message", "another,stringy,message", "{'also':'stringy','sortof':'message'}"};
|
||||
|
||||
// Expect to finish file after writing forth message cause of fileThreshhold
|
||||
long fileThreshold = messages[0].length() + messages[1].length() + messages[2].length() + messages[2].length() - 1;
|
||||
long flushInterval = 300000;
|
||||
TopicIngestionProperties props = new TopicIngestionProperties();
|
||||
props.ingestionProperties = new IngestionProperties(db, table);
|
||||
props.ingestionProperties = ingestionProperties;
|
||||
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
|
||||
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
|
||||
TopicPartitionWriter writer = new TopicPartitionWriter(tp, client, props, basePath, fileThreshold, flushInterval);
|
||||
|
||||
writer.open();
|
||||
List<SinkRecord> records = new ArrayList<SinkRecord>();
|
||||
|
@ -225,6 +236,13 @@ public class TopicPartitionWriterTest {
|
|||
|
||||
Assert.assertEquals((long) writer.lastCommittedOffset, (long) 15);
|
||||
Assert.assertEquals(writer.currentOffset, 16);
|
||||
Assert.assertEquals(writer.fileWriter.currentFile.path, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 16, IngestionProperties.DATA_FORMAT.csv.name())).toString());
|
||||
|
||||
String currentFileName = writer.fileWriter.currentFile.path;
|
||||
Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 16, IngestionProperties.DATA_FORMAT.csv.name())).toString());
|
||||
|
||||
// Read
|
||||
writer.fileWriter.finishFile(false);
|
||||
Consumer<FileDescriptor> assertFileConsumer = FileWriterTest.getAssertFileConsomer(messages[2] + "\n");
|
||||
assertFileConsumer.accept(writer.fileWriter.currentFile);
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче