зеркало из
1
0
Форкнуть 0

Merge pull request #11 from Azure/compressedData

Compressed data
This commit is contained in:
ohad bitton 2019-12-08 13:12:33 +02:00 коммит произвёл GitHub
Родитель 6a494dc158 2aa7690c40
Коммит 4cf4b7617c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 346 добавлений и 259 удалений

Просмотреть файл

@ -2,7 +2,7 @@ package com.microsoft.azure.kusto.kafka.connect.sink;
import java.io.File;
public class GZIPFileDescriptor {
public class FileDescriptor {
long rawBytes = 0;
long zippedBytes = 0;
long numRecords = 0;

Просмотреть файл

@ -1,6 +1,5 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -17,15 +16,16 @@ import java.util.zip.GZIPOutputStream;
* Currently supports size based rolling, where size is for *uncompressed* size,
* so final size can vary.
*/
public class GZIPFileWriter implements Closeable {
public class FileWriter implements Closeable {
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
public GZIPFileDescriptor currentFile;
public FileDescriptor currentFile;
private Timer timer;
private Consumer<GZIPFileDescriptor> onRollCallback;
private Consumer<FileDescriptor> onRollCallback;
private final long flushInterval;
private final boolean shouldCompressData;
private Supplier<String> getFilePath;
private GZIPOutputStream gzipStream;
private OutputStream outputStream;
private String basePath;
private CountingOutputStream fileStream;
private long fileThreshold;
@ -35,17 +35,20 @@ public class GZIPFileWriter implements Closeable {
* @param fileThreshold - Max size, uncompressed bytes.
* @param onRollCallback - Callback to allow code to execute when rolling a file. Blocking code.
* @param getFilePath - Allow external resolving of file name.
* @param shouldCompressData - Should the FileWriter compress the incoming data
*/
public GZIPFileWriter(String basePath,
long fileThreshold,
Consumer<GZIPFileDescriptor> onRollCallback,
Supplier<String> getFilePath,
long flushInterval) {
public FileWriter(String basePath,
long fileThreshold,
Consumer<FileDescriptor> onRollCallback,
Supplier<String> getFilePath,
long flushInterval,
boolean shouldCompressData) {
this.getFilePath = getFilePath;
this.basePath = basePath;
this.fileThreshold = fileThreshold;
this.onRollCallback = onRollCallback;
this.flushInterval = flushInterval;
this.shouldCompressData = shouldCompressData;
}
public boolean isDirty() {
@ -60,7 +63,7 @@ public class GZIPFileWriter implements Closeable {
resetFlushTimer(true);
}
gzipStream.write(data);
outputStream.write(data);
currentFile.rawBytes += data.length;
currentFile.zippedBytes += fileStream.numBytes;
@ -73,27 +76,27 @@ public class GZIPFileWriter implements Closeable {
}
public void openFile() throws IOException {
GZIPFileDescriptor fileDescriptor = new GZIPFileDescriptor();
FileDescriptor fileDescriptor = new FileDescriptor();
File folder = new File(basePath);
if (!folder.exists() && !folder.mkdirs()) {
throw new IOException(String.format("Failed to create new directory %s", folder.getPath()));
}
String filePath = getFilePath.get() + ".gz";
String filePath = getFilePath.get();
fileDescriptor.path = filePath;
File file = new File(filePath);
file.createNewFile();
FileOutputStream fos = new FileOutputStream(file);
fos.getChannel().truncate(0);
fileStream = new CountingOutputStream(fos);
gzipStream = new GZIPOutputStream(fileStream);
outputStream = shouldCompressData ? new GZIPOutputStream(fileStream) : fileStream;
fileDescriptor.file = file;
currentFile = fileDescriptor;
}
@ -104,18 +107,16 @@ public class GZIPFileWriter implements Closeable {
private void finishFile() throws IOException {
if (isDirty()) {
gzipStream.finish();
outputStream.close();
onRollCallback.accept(currentFile);
}
// closing late so that the success callback will have a chance to use the file.
gzipStream.close();
currentFile.file.delete();
}
public void rollback() throws IOException {
if (gzipStream != null) {
gzipStream.close();
if (outputStream != null) {
outputStream.close();
if (currentFile != null && currentFile.file != null) {
currentFile.file.delete();
}
@ -123,9 +124,12 @@ public class GZIPFileWriter implements Closeable {
}
public void close() throws IOException {
if (timer!= null) {
timer.cancel();
timer.purge();
}
// Flush last file, updating index
timer.cancel();
timer.purge();
finishFile();
// Setting to null so subsequent calls to close won't write it again
@ -171,7 +175,7 @@ public class GZIPFileWriter implements Closeable {
private class CountingOutputStream extends FilterOutputStream {
private long numBytes = 0;
CountingOutputStream(OutputStream out) throws IOException {
CountingOutputStream(OutputStream out) {
super(out);
}

Просмотреть файл

@ -1,6 +1,5 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

Просмотреть файл

@ -5,6 +5,8 @@ import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
@ -27,7 +29,7 @@ import java.util.*;
public class KustoSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
private final Set<TopicPartition> assignment;
private Map<String, IngestionProperties> topicsToIngestionProps;
private Map<String, TopicIngestionProperties> topicsToIngestionProps;
IngestClient kustoIngestClient;
Map<TopicPartition, TopicPartitionWriter> writers;
private long maxFileSize;
@ -71,8 +73,8 @@ public class KustoSinkTask extends SinkTask {
throw new ConfigException("Kusto authentication method must be provided.");
}
public static Map<String, IngestionProperties> getTopicsToIngestionProps(KustoSinkConfig config) throws ConfigException {
Map<String, IngestionProperties> result = new HashMap<>();
public static Map<String, TopicIngestionProperties> getTopicsToIngestionProps(KustoSinkConfig config) throws ConfigException {
Map<String, TopicIngestionProperties> result = new HashMap<>();
try {
if (config.getKustoTopicToTableMapping() != null) {
@ -87,13 +89,14 @@ public class KustoSinkTask extends SinkTask {
String table = mapping.getString("table");
String format = mapping.optString("format");
CompressionType compressionType = StringUtils.isBlank(mapping.optString("eventDataCompression")) ? null : CompressionType.valueOf(mapping.optString("eventDataCompression"));
IngestionProperties props = new IngestionProperties(db, table);
if (format != null && !format.isEmpty()) {
// TODO:after java client reveals multijson - use only this for simplicity
// if (format.equals("json") || format.equals("singlejson")){
// props.setDataFormat("multijson");
// }
if (format.equals("json") || format.equals("singlejson")){
props.setDataFormat("multijson");
}
props.setDataFormat(format);
}
@ -112,10 +115,12 @@ public class KustoSinkTask extends SinkTask {
}
}
}
result.put(mapping.getString("topic"), props);
TopicIngestionProperties topicIngestionProperties = new TopicIngestionProperties();
topicIngestionProperties.eventDataCompression = compressionType;
topicIngestionProperties.ingestionProperties = props;
result.put(mapping.getString("topic"), topicIngestionProperties);
} catch (Exception ex) {
throw new ConfigException("Malformed topics to kusto ingestion props mappings");
throw new ConfigException("Malformed topics to kusto ingestion props mappings", ex);
}
}
@ -129,7 +134,7 @@ public class KustoSinkTask extends SinkTask {
throw new ConfigException("Malformed topics to kusto ingestion props mappings");
}
public IngestionProperties getIngestionProps(String topic) {
public TopicIngestionProperties getIngestionProps(String topic) {
return topicsToIngestionProps.get(topic);
}
@ -142,7 +147,7 @@ public class KustoSinkTask extends SinkTask {
public void open(Collection<TopicPartition> partitions) throws ConnectException {
assignment.addAll(partitions);
for (TopicPartition tp : assignment) {
IngestionProperties ingestionProps = getIngestionProps(tp.topic());
TopicIngestionProperties ingestionProps = getIngestionProps(tp.topic());
log.debug(String.format("Open Kusto topic: '%s' with partition: '%s'", tp.topic(), tp.partition()));
if (ingestionProps == null) {
throw new ConnectException(String.format("Kusto Sink has no ingestion props mapped for the topic: %s. please check your configuration.", tp.topic()));
@ -182,7 +187,6 @@ public class KustoSinkTask extends SinkTask {
tempDir = config.getKustoSinkTempDir();
maxFileSize = config.getKustoFlushSize();
flushInterval = config.getKustoFlushIntervalMS();
log.info(String.format("Kafka Kusto Sink started. target cluster: (%s), source topics: (%s)", url, topicsToIngestionProps.keySet().toString()));
open(context.assignment());

Просмотреть файл

@ -0,0 +1,10 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
class TopicIngestionProperties {
IngestionProperties ingestionProperties;
CompressionType eventDataCompression = null;
}

Просмотреть файл

@ -2,6 +2,7 @@ package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.common.TopicPartition;
@ -16,30 +17,30 @@ import java.nio.file.Paths;
public class TopicPartitionWriter {
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
private TopicPartition tp;
private IngestClient client;
private final CompressionType eventDataCompression;
private final TopicPartition tp;
private final IngestClient client;
private final IngestionProperties ingestionProps;
private String basePath;
private long flushInterval;
private long fileThreshold;
private final String basePath;
private final long flushInterval;
private final long fileThreshold;
GZIPFileWriter gzipFileWriter;
FileWriter fileWriter;
long currentOffset;
Long lastCommittedOffset;
TopicPartitionWriter(
TopicPartition tp, IngestClient client, IngestionProperties ingestionProps, String basePath, long fileThreshold, long flushInterval
) {
TopicPartitionWriter(TopicPartition tp, IngestClient client, TopicIngestionProperties ingestionProps, String basePath, long fileThreshold, long flushInterval) {
this.tp = tp;
this.client = client;
this.ingestionProps = ingestionProps;
this.ingestionProps = ingestionProps.ingestionProperties;
this.fileThreshold = fileThreshold;
this.basePath = basePath;
this.flushInterval = flushInterval;
this.currentOffset = 0;
this.eventDataCompression = ingestionProps.eventDataCompression;
}
public void handleRollFile(GZIPFileDescriptor fileDescriptor) {
public void handleRollFile(FileDescriptor fileDescriptor) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(fileDescriptor.path, fileDescriptor.rawBytes);
try {
@ -52,8 +53,11 @@ public class TopicPartitionWriter {
}
public String getFilePath() {
long nextOffset = gzipFileWriter != null && gzipFileWriter.isDirty() ? currentOffset + 1 : currentOffset;
return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat())).toString();
long nextOffset = fileWriter != null && fileWriter.isDirty() ? currentOffset + 1 : currentOffset;
// Output files are always compressed
String compressionExtension = this.eventDataCompression == null ? "gz" : this.eventDataCompression.toString();
return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s.%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat(), compressionExtension)).toString();
}
public void writeRecord(SinkRecord record) {
@ -79,7 +83,7 @@ public class TopicPartitionWriter {
this.currentOffset = record.kafkaOffset();
} else {
try {
gzipFileWriter.write(value);
fileWriter.write(value);
this.currentOffset = record.kafkaOffset();
} catch (IOException e) {
@ -89,16 +93,23 @@ public class TopicPartitionWriter {
}
public void open() {
boolean flushImmediately = ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.avro.toString()) ||
ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.parquet.toString());
boolean flushImmediately = ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.avro.toString())
|| ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.parquet.toString())
|| this.eventDataCompression != null;
gzipFileWriter = new GZIPFileWriter(basePath, fileThreshold, this::handleRollFile, this::getFilePath, flushImmediately ? 0: flushInterval);
fileWriter = new FileWriter(
basePath,
fileThreshold,
this::handleRollFile,
this::getFilePath,
flushImmediately ? 0 : flushInterval,
this.eventDataCompression == null);
}
public void close() {
try {
gzipFileWriter.rollback();
// gzipFileWriter.close(); TODO ?
fileWriter.rollback();
// fileWriter.close(); TODO ?
} catch (IOException e) {
e.printStackTrace();
}

Просмотреть файл

@ -0,0 +1,220 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.testng.Assert;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class FileWriterTest {
private File currentDirectory;
@Before
public final void before() {
currentDirectory = new File(Paths.get(
System.getProperty("java.io.tmpdir"),
FileWriter.class.getSimpleName(),
String.valueOf(Instant.now().toEpochMilli())
).toString());
}
@After
public final void after() {
try {
FileUtils.deleteDirectory(currentDirectory);
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void testOpen() throws IOException {
String path = Paths.get(currentDirectory.getPath(), "testWriterOpen").toString();
File folder = new File(path);
boolean mkdirs = folder.mkdirs();
Assert.assertTrue(mkdirs);
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
final String FILE_PATH = Paths.get(path, "ABC").toString();
final int MAX_FILE_SIZE = 128;
Consumer<FileDescriptor> trackFiles = (FileDescriptor f) -> {};
Supplier<String> generateFileName = () -> FILE_PATH;
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);
fileWriter.openFile();
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 1);
Assert.assertEquals(fileWriter.currentFile.rawBytes, 0);
Assert.assertEquals(fileWriter.currentFile.path, FILE_PATH + ".gz");
Assert.assertTrue(fileWriter.currentFile.file.canWrite());
fileWriter.rollback();
}
@Test
public void testGzipFileWriter() throws IOException {
String path = Paths.get(currentDirectory.getPath(), "testGzipFileWriter").toString();
File folder = new File(path);
boolean mkdirs = folder.mkdirs();
Assert.assertTrue(mkdirs);
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
HashMap<String, Long> files = new HashMap<>();
final int MAX_FILE_SIZE = 128;
Consumer<FileDescriptor> trackFiles = (FileDescriptor f) -> files.put(f.path, f.rawBytes);
Supplier<String> generateFileName = () -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString();
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);
for (int i = 0; i < 9; i++) {
String msg = String.format("Line number %d : This is a message from the other size", i);
fileWriter.write(msg.getBytes(StandardCharsets.UTF_8));
}
Assert.assertEquals(files.size(), 4);
// should still have 1 open file at this point...
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 1);
// close current file
fileWriter.close();
Assert.assertEquals(files.size(), 5);
List<Long> sortedFiles = new ArrayList<>(files.values());
sortedFiles.sort((Long x, Long y) -> (int) (y - x));
Assert.assertEquals(sortedFiles, Arrays.asList((long) 106, (long) 106, (long) 106, (long) 106, (long) 53));
// make sure folder is clear once done
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
}
@Test
public void testGzipFileWriterFlush() throws IOException, InterruptedException {
String path = Paths.get(currentDirectory.getPath(), "testGzipFileWriter2").toString();
File folder = new File(path);
boolean mkdirs = folder.mkdirs();
Assert.assertTrue(mkdirs);
HashMap<String, Long> files = new HashMap<>();
final int MAX_FILE_SIZE = 128 * 2;
Consumer<FileDescriptor> trackFiles = (FileDescriptor f) -> files.put(f.path, f.rawBytes);
Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString();
// Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);
String msg = "Message";
fileWriter.write(msg.getBytes(StandardCharsets.UTF_8));
Thread.sleep(1000);
Assert.assertEquals(files.size(), 0);
fileWriter.close();
Assert.assertEquals(files.size(), 1);
String path2 = Paths.get(currentDirectory.getPath(), "testGzipFileWriter2_2").toString();
File folder2 = new File(path2);
mkdirs = folder2.mkdirs();
Assert.assertTrue(mkdirs);
Supplier<String> generateFileName2 = () -> Paths.get(path2, java.util.UUID.randomUUID().toString()).toString();
// Expect one file to be ingested as flushInterval had changed
FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, false);
String msg2 = "Second Message";
fileWriter2.write(msg2.getBytes(StandardCharsets.UTF_8));
Thread.sleep(1010);
Assert.assertEquals(files.size(), 2);
List<Long> sortedFiles = new ArrayList<>(files.values());
sortedFiles.sort((Long x, Long y) -> (int) (y - x));
Assert.assertEquals(sortedFiles, Arrays.asList((long) 14, (long) 7));
// make sure folder is clear once done
fileWriter2.close();
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
}
@Test
public void testFileWriterCompressed() throws IOException {
String path = Paths.get(currentDirectory.getPath(), "testGzipFileWriter2").toString();
File folder = new File(path);
boolean mkdirs = folder.mkdirs();
Assert.assertTrue(mkdirs);
HashMap<String, Long> files = new HashMap<>();
final int MAX_FILE_SIZE = 128 * 2;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
String msg = "Message";
Consumer<FileDescriptor> trackFiles = (FileDescriptor f) -> {
try (FileInputStream fileInputStream = new FileInputStream(f.file)){
byte[] bytes = IOUtils.toByteArray(fileInputStream);
try (ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
GZIPInputStream gzipper = new GZIPInputStream(bin)){
byte[] buffer = new byte[1024];
ByteArrayOutputStream out = new ByteArrayOutputStream();
int len;
while ((len = gzipper.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
gzipper.close();
out.close();
String s = new String(out.toByteArray());
Assert.assertEquals(s, msg);
}
} catch (IOException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
};
Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";
// Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, true);
gzipOutputStream.write(msg.getBytes());
gzipOutputStream.finish();
fileWriter.write(byteArrayOutputStream.toByteArray());
fileWriter.close();
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
}
}

Просмотреть файл

@ -1,170 +0,0 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.testng.Assert;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class GZIPFileWriterTest {
File currentDirectory;
@Before
public final void before() {
currentDirectory = new File(Paths.get(
System.getProperty("java.io.tmpdir"),
GZIPFileWriter.class.getSimpleName(),
String.valueOf(Instant.now().toEpochMilli())
).toString());
}
@After
public final void after() {
try {
FileUtils.deleteDirectory(currentDirectory);
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void testOpen() throws IOException {
String path = Paths.get(currentDirectory.getPath(), "testWriterOpen").toString();
File folder = new File(path);
folder.mkdirs();
Assert.assertEquals(folder.listFiles().length, 0);
HashMap<String, Long> files = new HashMap<String, Long>();
final String FILE_PATH = Paths.get(path, "ABC").toString();
final int MAX_FILE_SIZE = 128;
Consumer<GZIPFileDescriptor> trackFiles = (GZIPFileDescriptor f) -> {
files.put(f.path, f.rawBytes);
};
Supplier<String> generateFileName = () -> FILE_PATH;
GZIPFileWriter gzipFileWriter = new GZIPFileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);
gzipFileWriter.openFile();
Assert.assertEquals(folder.listFiles().length, 1);
Assert.assertEquals(gzipFileWriter.currentFile.rawBytes, 0);
Assert.assertEquals(gzipFileWriter.currentFile.path, FILE_PATH + ".gz");
Assert.assertTrue(gzipFileWriter.currentFile.file.canWrite());
gzipFileWriter.rollback();
}
@Test
public void testGzipFileWriter() throws IOException {
String path = Paths.get(currentDirectory.getPath(), "testGzipFileWriter").toString();
File folder = new File(path);
folder.mkdirs();
Assert.assertEquals(folder.listFiles().length, 0);
HashMap<String, Long> files = new HashMap<String, Long>();
final int MAX_FILE_SIZE = 128;
Consumer<GZIPFileDescriptor> trackFiles = (GZIPFileDescriptor f) -> {
files.put(f.path, f.rawBytes);
};
Supplier<String> generateFileName = () -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString();
GZIPFileWriter gzipFileWriter = new GZIPFileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);
for (int i = 0; i < 9; i++) {
String msg = String.format("Line number %d : This is a message from the other size", i);
gzipFileWriter.write(msg.getBytes("UTF-8"));
}
Assert.assertEquals(files.size(), 4);
// should still have 1 open file at this point...
Assert.assertEquals(folder.listFiles().length, 1);
// close current file
gzipFileWriter.close();
Assert.assertEquals(files.size(), 5);
List<Long> sortedFiles = new ArrayList<Long>(files.values());
sortedFiles.sort((Long x, Long y) -> (int) (y - x));
Assert.assertEquals(sortedFiles, Arrays.asList((long) 106, (long) 106, (long) 106, (long) 106, (long) 53));
// make sure folder is clear once done
Assert.assertEquals(folder.listFiles().length, 0);
}
@Test
public void testGzipFileWriterFlush() throws IOException, InterruptedException {
String path = Paths.get(currentDirectory.getPath(), "testGzipFileWriter2").toString();
File folder = new File(path);
folder.mkdirs();
HashMap<String, Long> files = new HashMap<String, Long>();
final int MAX_FILE_SIZE = 128 * 2;
Consumer<GZIPFileDescriptor> trackFiles = (GZIPFileDescriptor f) -> {
files.put(f.path, f.rawBytes);
};
Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString();
// Expect no files to be ingested as size is small and flushInterval is big
GZIPFileWriter gzipFileWriter = new GZIPFileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);
String msg = "Message";
gzipFileWriter.write(msg.getBytes(StandardCharsets.UTF_8));
Thread.sleep(1000);
Assert.assertEquals(files.size(), 0);
gzipFileWriter.close();
Assert.assertEquals(files.size(), 1);
String path2 = Paths.get(currentDirectory.getPath(), "testGzipFileWriter2").toString();
File folder2 = new File(path);
folder2.mkdirs();
Supplier<String> generateFileName2 = () -> Paths.get(path2, java.util.UUID.randomUUID().toString()).toString();
// Expect one file to be ingested as flushInterval had changed
GZIPFileWriter gzipFileWriter2 = new GZIPFileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, false);
String msg2 = "Second Message";
gzipFileWriter2.write(msg2.getBytes(StandardCharsets.UTF_8));
Thread.sleep(1010);
Assert.assertEquals(files.size(), 2);
List<Long> sortedFiles = new ArrayList<Long>(files.values());
sortedFiles.sort((Long x, Long y) -> (int) (y - x));
Assert.assertEquals(sortedFiles, Arrays.asList((long) 14, (long) 7));
// make sure folder is clear once done
gzipFileWriter2.close();
Assert.assertEquals(folder.listFiles().length, 0);
}
}

Просмотреть файл

@ -1,5 +1,6 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
@ -26,7 +27,7 @@ public class KustoSinkTaskTest {
public final void before() {
currentDirectory = new File(Paths.get(
System.getProperty("java.io.tmpdir"),
GZIPFileWriter.class.getName(),
FileWriter.class.getName(),
String.valueOf(Instant.now().toEpochMilli())
).toString());
}
@ -39,7 +40,7 @@ public class KustoSinkTaskTest {
@Test
public void testSinkTaskOpen() throws Exception {
HashMap<String, String> props = new HashMap<>();
props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_URL, "https://cluster_name.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db1', 'table': 'table1','format': 'json','mapping': 'Mapping'}]");
props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME, "test@test.com");
@ -60,7 +61,7 @@ public class KustoSinkTaskTest {
@Test
public void testSinkTaskPutRecord() throws Exception {
HashMap<String, String> props = new HashMap<>();
props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_URL, "https://cluster_name.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_SINK_TEMPDIR, System.getProperty("java.io.tmpdir"));
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'testing1','db': 'db1', 'table': 'table1','format': 'json','mapping': 'Mapping'}]");
props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME, "test@test.com");
@ -87,7 +88,7 @@ public class KustoSinkTaskTest {
@Test
public void testSinkTaskPutRecordMissingPartition() throws Exception {
HashMap<String, String> props = new HashMap<>();
props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_URL, "https://cluster_name.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db1', 'table': 'table1','format': 'json','mapping': 'Mapping'}]");
props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME, "test@test.com");
@ -125,7 +126,7 @@ public class KustoSinkTaskTest {
}
props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_URL, "https://cluster_name.kusto.windows.net");
{
Throwable exception = assertThrows(ConnectException.class, () -> {
@ -160,7 +161,7 @@ public class KustoSinkTaskTest {
@Test
public void sinkStartMissingAuth() {
HashMap<String, String> props = new HashMap<>();
props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_URL, "https://cluster_name.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "[{'topic': 'testing1','db': 'db1', 'table': 'table1','format': 'csv'}]");
KustoSinkTask kustoSinkTask = new KustoSinkTask();
@ -220,8 +221,8 @@ public class KustoSinkTaskTest {
@Test
public void getTable() {
HashMap<String, String> props = new HashMap<>();
props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db2', 'table': 'table2','format': 'json','mapping': 'Mapping'}]");
props.put(KustoSinkConfig.KUSTO_URL, "https://cluster_name.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv', 'eventDataCompression':'gz'},{'topic': 'topic2','db': 'db2', 'table': 'table2','format': 'json','mapping': 'Mapping'}]");
props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME, "test@test.com");
props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD, "123456!");
@ -229,14 +230,15 @@ public class KustoSinkTaskTest {
kustoSinkTask.start(props);
{
// single table mapping should cause all topics to be mapped to a single table
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").getDatabaseName(), "db1");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").getTableName(), "table1");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").getDataFormat(), "csv");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").getDatabaseName(), "db2");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").getTableName(), "table2");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").getDataFormat(), "json");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").getIngestionMapping().getIngestionMappingReference(), "Mapping");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic3"), null);
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").ingestionProperties.getDatabaseName(), "db1");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").ingestionProperties.getTableName(), "table1");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").ingestionProperties.getDataFormat(), "csv");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").ingestionProperties.getDatabaseName(), "db2");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").ingestionProperties.getTableName(), "table2");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").ingestionProperties.getDataFormat(), "json");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").ingestionProperties.getIngestionMapping().getIngestionMappingReference(), "Mapping");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").eventDataCompression, CompressionType.gz);
Assert.assertNull(kustoSinkTask.getIngestionProps("topic3"));
}
}
}

Просмотреть файл

@ -23,14 +23,14 @@ import java.util.List;
import static org.mockito.Mockito.*;
public class TopicPartitionWriterTest {
// TODO: should probably find a better way to mock internal class (GZIPFileWriter)...
// TODO: should probably find a better way to mock internal class (FileWriter)...
File currentDirectory;
@Before
public final void before() {
currentDirectory = new File(Paths.get(
System.getProperty("java.io.tmpdir"),
GZIPFileWriter.class.getSimpleName(),
FileWriter.class.getSimpleName(),
String.valueOf(Instant.now().toEpochMilli())
).toString());
}
@ -54,9 +54,11 @@ public class TopicPartitionWriterTest {
long fileThreshold = 100;
long flushInterval = 300000;
IngestionProperties ingestionProperties = new IngestionProperties(db, table);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, ingestionProperties, basePath, fileThreshold, flushInterval);
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = ingestionProperties;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, basePath, fileThreshold, flushInterval);
GZIPFileDescriptor descriptor = new GZIPFileDescriptor();
FileDescriptor descriptor = new FileDescriptor();
descriptor.rawBytes = 1024;
descriptor.path = "somepath/somefile";
descriptor.file = new File ("C://myfile.txt");
@ -86,8 +88,9 @@ public class TopicPartitionWriterTest {
String basePath = "somepath";
long fileThreshold = 100;
long flushInterval = 300000;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, new IngestionProperties(db, table), basePath, fileThreshold, flushInterval);
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = new IngestionProperties(db, table);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
Assert.assertEquals(writer.getFilePath(), Paths.get(basePath, "kafka_testTopic_11_0").toString());
}
@ -101,8 +104,9 @@ public class TopicPartitionWriterTest {
String basePath = "somepath";
long fileThreshold = 100;
long flushInterval = 300000;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, new IngestionProperties(db, table), basePath, fileThreshold, flushInterval);
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = new IngestionProperties(db, table);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
writer.open();
List<SinkRecord> records = new ArrayList<>();
@ -125,8 +129,9 @@ public class TopicPartitionWriterTest {
String basePath = "somepath";
long fileThreshold = 100;
long flushInterval = 300000;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient,new IngestionProperties(db,table), basePath, fileThreshold, flushInterval);
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = new IngestionProperties(db, table);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
writer.open();
writer.close();
}
@ -166,8 +171,9 @@ public class TopicPartitionWriterTest {
String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
long fileThreshold = 100;
long flushInterval = 300000;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, new IngestionProperties(db,table), basePath, fileThreshold, flushInterval);
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = new IngestionProperties(db, table);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
writer.open();
@ -180,7 +186,7 @@ public class TopicPartitionWriterTest {
writer.writeRecord(record);
}
Assert.assertEquals(writer.gzipFileWriter.currentFile.path, Paths.get(basePath, String.format("kafka_%s_%d_%d.gz", tp.topic(), tp.partition(), 0)).toString());
Assert.assertEquals(writer.fileWriter.currentFile.path, Paths.get(basePath, String.format("kafka_%s_%d_%d.gz", tp.topic(), tp.partition(), 0)).toString());
}
@Test
@ -192,8 +198,9 @@ public class TopicPartitionWriterTest {
String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
long fileThreshold = 50;
long flushInterval = 300000;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, new IngestionProperties(db,table), basePath, fileThreshold, flushInterval);
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = new IngestionProperties(db, table);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();
@ -212,6 +219,6 @@ public class TopicPartitionWriterTest {
//TODO : file threshold ignored?
Assert.assertTrue(writer.lastCommittedOffset.equals((long) 15));
Assert.assertEquals(writer.currentOffset, 17);
Assert.assertEquals(writer.gzipFileWriter.currentFile.path, Paths.get(basePath, String.format("kafka_%s_%d_%d.gz", tp.topic(), tp.partition(), 16)).toString());
Assert.assertEquals(writer.fileWriter.currentFile.path, Paths.get(basePath, String.format("kafka_%s_%d_%d.gz", tp.topic(), tp.partition(), 16)).toString());
}
}