зеркало из
1
0
Форкнуть 0
This commit is contained in:
Ohad Bitton 2020-01-29 14:03:17 +02:00
Родитель e3801df32c
Коммит db2ad56a3b
4 изменённых файлов: 75 добавлений и 33 удалений

Просмотреть файл

@ -101,18 +101,24 @@ public class FileWriter implements Closeable {
} }
void rotate() throws IOException { void rotate() throws IOException {
finishFile(true); finishFile();
openFile(); openFile();
} }
void finishFile(boolean delete) throws IOException { void finishFile() throws IOException {
outputStream.close();
if (isDirty()) { if (isDirty()) {
if(shouldCompressData){
GZIPOutputStream gzip = (GZIPOutputStream) outputStream;
gzip.finish();
} else {
outputStream.flush();
}
fileStream.close();
onRollCallback.accept(currentFile); onRollCallback.accept(currentFile);
} }
if(delete){
currentFile.file.delete(); // closing late so that the success callback will have a chance to use the file. This is a real thing on debug?!
} outputStream.close();
} }
public void rollback() throws IOException { public void rollback() throws IOException {
@ -131,7 +137,7 @@ public class FileWriter implements Closeable {
} }
// Flush last file, updating index // Flush last file, updating index
finishFile(true); finishFile();
// Setting to null so subsequent calls to close won't write it again // Setting to null so subsequent calls to close won't write it again
currentFile = null; currentFile = null;

Просмотреть файл

@ -45,6 +45,7 @@ public class TopicPartitionWriter {
try { try {
client.ingestFromFile(fileSourceInfo, ingestionProps); client.ingestFromFile(fileSourceInfo, ingestionProps);
fileDescriptor.file.delete();
log.info(String.format("Kusto ingestion: file (%s) of size (%s) at current offset (%s)", fileDescriptor.path, fileDescriptor.rawBytes, currentOffset)); log.info(String.format("Kusto ingestion: file (%s) of size (%s) at current offset (%s)", fileDescriptor.path, fileDescriptor.rawBytes, currentOffset));
this.lastCommittedOffset = currentOffset; this.lastCommittedOffset = currentOffset;
} catch (Exception e) { } catch (Exception e) {

Просмотреть файл

@ -177,7 +177,7 @@ public class FileWriterTest {
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream); GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
String msg = "Message"; String msg = "Message";
Consumer<FileDescriptor> trackFiles = getAssertFileConsomer(msg); Consumer<FileDescriptor> trackFiles = getAssertFileConsumer(msg);
Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz"; Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";
@ -192,7 +192,7 @@ public class FileWriterTest {
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0); Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
} }
static Consumer<FileDescriptor> getAssertFileConsomer(String msg) { static Consumer<FileDescriptor> getAssertFileConsumer(String msg) {
return (FileDescriptor f) -> { return (FileDescriptor f) -> {
try (FileInputStream fileInputStream = new FileInputStream(f.file)) { try (FileInputStream fileInputStream = new FileInputStream(f.file)) {
byte[] bytes = IOUtils.toByteArray(fileInputStream); byte[] bytes = IOUtils.toByteArray(fileInputStream);

Просмотреть файл

@ -1,12 +1,12 @@
package com.microsoft.azure.kusto.kafka.connect.sink; package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.data.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient; import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties; import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -14,29 +14,29 @@ import org.junit.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.testng.Assert; import org.testng.Assert;
import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.zip.GZIPOutputStream;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
public class TopicPartitionWriterTest { public class TopicPartitionWriterTest {
// TODO: should probably find a better way to mock internal class (FileWriter)... // TODO: should probably find a better way to mock internal class (FileWriter)...
File currentDirectory; private File currentDirectory;
@Before @Before
public final void before() { public final void before() {
currentDirectory = new File("C:\\Users\\ohbitton\\Desktop\\clients - backup"); currentDirectory = new File(Paths.get(
// currentDirectory = new File(Paths.get( System.getProperty("java.io.tmpdir"),
// System.getProperty("java.io.tmpdir"), FileWriter.class.getSimpleName(),
// FileWriter.class.getSimpleName(), String.valueOf(Instant.now().toEpochMilli())
// String.valueOf(Instant.now().toEpochMilli()) ).toString());
// ).toString());
} }
@After @After
@ -200,27 +200,21 @@ public class TopicPartitionWriterTest {
} }
@Test @Test
public void testWriteBytesValuesAndOffset() throws IOException, URISyntaxException { public void testWriteStringValuesAndOffset() throws IOException {
TopicPartition tp = new TopicPartition("testPartition", 11); TopicPartition tp = new TopicPartition("testPartition", 11);
IngestClient mockClient = mock(IngestClient.class); IngestClient mockClient = mock(IngestClient.class);
String ClientID ="d5e0a24c-3a09-40ce-a1d6-dc5ab58dae66"; String db = "testdb1";
String pass = "L+0hoM34kqC22XRniWOgkETwVvawiir2odEjYqZeyXA="; String table = "testtable1";
String auth = "72f988bf-86f1-41af-91ab-2d7cd011db47";
// IngestClient client = IngestClientFactory.createClient(ConnectionStringBuilder.createWithDeviceCodeCredentials("https://ingest-ohbitton.kusto.windows.net"));
ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials("https://ingest-ohbitton.dev.kusto.windows.net", ClientID, pass, auth);
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties = new IngestionProperties("ohtst","TestTable2");
String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString(); String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
String[] messages = new String[]{ "stringy message", "another,stringy,message", "{'also':'stringy','sortof':'message'}"}; String[] messages = new String[]{ "stringy message", "another,stringy,message", "{'also':'stringy','sortof':'message'}"};
// Expect to finish file after writing forth message cause of fileThreshhold // Expect to finish file after writing forth message cause of fileThreshold
long fileThreshold = messages[0].length() + messages[1].length() + messages[2].length() + messages[2].length() - 1; long fileThreshold = messages[0].length() + messages[1].length() + messages[2].length() + messages[2].length() - 1;
long flushInterval = 300000; long flushInterval = 300000;
TopicIngestionProperties props = new TopicIngestionProperties(); TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = ingestionProperties; props.ingestionProperties = new IngestionProperties(db, table);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv); props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, client, props, basePath, fileThreshold, flushInterval); TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
writer.open(); writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>(); List<SinkRecord> records = new ArrayList<SinkRecord>();
@ -241,8 +235,49 @@ public class TopicPartitionWriterTest {
Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 16, IngestionProperties.DATA_FORMAT.csv.name())).toString()); Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 16, IngestionProperties.DATA_FORMAT.csv.name())).toString());
// Read // Read
writer.fileWriter.finishFile(false); writer.fileWriter.finishFile();
Consumer<FileDescriptor> assertFileConsumer = FileWriterTest.getAssertFileConsomer(messages[2] + "\n"); Consumer<FileDescriptor> assertFileConsumer = FileWriterTest.getAssertFileConsumer(messages[2] + "\n");
assertFileConsumer.accept(writer.fileWriter.currentFile); assertFileConsumer.accept(writer.fileWriter.currentFile);
} }
@Test
public void testWriteBytesValuesAndOffset() throws IOException {
TopicPartition tp = new TopicPartition("testPartition", 11);
IngestClient mockClient = mock(IngestClient.class);
String db = "testdb1";
String table = "testtable1";
String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
String[] messages = new String[]{ "stringy message", "another,stringy,message", "{'also':'stringy','sortof':'message'}"};
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(outputStream);
for (String msg: messages){
byte[] data = msg.getBytes();
gzipOutputStream.write(data);
}
gzipOutputStream.finish();
// Expect to finish file with one record although fileThreshold is high
long fileThreshold = 128;
long flushInterval = 300000;
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = new IngestionProperties(db, table);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
props.eventDataCompression = CompressionType.gz;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.BYTES_SCHEMA, outputStream.toByteArray(), 10));
for (SinkRecord record : records) {
writer.writeRecord(record);
}
Assert.assertEquals((long) writer.lastCommittedOffset, (long) 10);
Assert.assertEquals(writer.currentOffset, 10);
String currentFileName = writer.fileWriter.currentFile.path;
Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 11, IngestionProperties.DATA_FORMAT.csv.name())).toString());
}
} }