Resolved conflicts | Ignoring E2E tests

This commit is contained in:
fahad 2020-06-11 17:03:40 +05:30
Родитель 112c1cb2ea 240f14d9cd
Коммит 11cadcd508
9 изменённых файлов: 320 добавлений и 95 удалений

Просмотреть файл

@ -49,7 +49,6 @@ example configuration:
```config
name=KustoSinkConnector
connector.class=com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector
kusto.sink.flush_interval_ms=300000
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
tasks.max=1
@ -61,6 +60,7 @@ kusto.auth.appid=XXX
kusto.auth.appkey=XXX
kusto.sink.tempdir=/var/tmp/
kusto.sink.flush_size=1000
kusto.sink.flush_interval_ms=300000
```
Aggregation in the sink is done using files, these are sent to kusto if the aggregated file has reached the flush_size
(size is in bytes) or if the flush_interval_ms interval has passed.

Просмотреть файл

@ -8,7 +8,7 @@
<groupId>com.microsoft.azure</groupId>
<artifactId>kafka-sink-azure-kusto</artifactId>
<packaging>jar</packaging>
<version>0.3.4</version>
<version>0.3.5</version>
<build>
<plugins>
<plugin>
@ -99,11 +99,7 @@
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.json</groupId>

Просмотреть файл

@ -1,13 +1,15 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.google.common.base.Function;
import org.apache.kafka.connect.errors.ConnectException;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.Timer;
import java.util.TimerTask;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.zip.GZIPOutputStream;
/**
@ -20,17 +22,19 @@ public class FileWriter implements Closeable {
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
SourceFile currentFile;
private Timer timer;
private Consumer<SourceFile> onRollCallback;
private Function<SourceFile, String> onRollCallback;
private final long flushInterval;
private final boolean shouldCompressData;
private Supplier<String> getFilePath;
private Function<Long, String> getFilePath;
private OutputStream outputStream;
private String basePath;
private CountingOutputStream countingStream;
private long fileThreshold;
// Lock is given from TopicPartitionWriter to lock while ingesting
private ReentrantReadWriteLock reentrantReadWriteLock;
// Don't remove! File descriptor is kept so that the file is not deleted when stream is closed
private FileDescriptor currentFileDescriptor;
private String flushError;
/**
* @param basePath - This is path to which to write the files to.
@ -41,27 +45,38 @@ public class FileWriter implements Closeable {
*/
public FileWriter(String basePath,
long fileThreshold,
Consumer<SourceFile> onRollCallback,
Supplier<String> getFilePath,
Function<SourceFile, String> onRollCallback,
Function<Long, String> getFilePath,
long flushInterval,
boolean shouldCompressData) {
boolean shouldCompressData,
ReentrantReadWriteLock reentrantLock) {
this.getFilePath = getFilePath;
this.basePath = basePath;
this.fileThreshold = fileThreshold;
this.onRollCallback = onRollCallback;
this.flushInterval = flushInterval;
this.shouldCompressData = shouldCompressData;
// This is a fair lock so that we flush close to the time intervals
this.reentrantReadWriteLock = reentrantLock;
// If we failed on flush we want to throw the error from the put() flow.
flushError = null;
}
boolean isDirty() {
return this.currentFile != null && this.currentFile.rawBytes > 0;
}
public synchronized void write(byte[] data) throws IOException {
public synchronized void write(byte[] data, @Nullable Long offset) throws IOException {
if (flushError != null) {
throw new ConnectException(flushError);
}
if (data == null || data.length == 0) return;
if (currentFile == null) {
openFile();
openFile(offset);
resetFlushTimer(true);
}
@ -72,12 +87,12 @@ public class FileWriter implements Closeable {
currentFile.numRecords++;
if (this.flushInterval == 0 || currentFile.rawBytes > fileThreshold) {
rotate();
rotate(offset);
resetFlushTimer(true);
}
}
public void openFile() throws IOException {
public void openFile(@Nullable Long offset) throws IOException {
SourceFile fileProps = new SourceFile();
File folder = new File(basePath);
@ -85,7 +100,7 @@ public class FileWriter implements Closeable {
throw new IOException(String.format("Failed to create new directory %s", folder.getPath()));
}
String filePath = getFilePath.get();
String filePath = getFilePath.apply(offset);
fileProps.path = filePath;
File file = new File(filePath);
@ -102,9 +117,9 @@ public class FileWriter implements Closeable {
currentFile = fileProps;
}
void rotate() throws IOException {
void rotate(@Nullable Long offset) throws IOException {
finishFile(true);
openFile();
openFile(offset);
}
void finishFile(Boolean delete) throws IOException {
@ -115,8 +130,10 @@ public class FileWriter implements Closeable {
} else {
outputStream.flush();
}
onRollCallback.accept(currentFile);
String err = onRollCallback.apply(currentFile);
if(err != null){
throw new ConnectException(err);
}
if (delete){
dumpFile();
}
@ -132,6 +149,7 @@ public class FileWriter implements Closeable {
if (!deleted) {
log.warn("couldn't delete temporary file. File exists: " + currentFile.file.exists());
}
currentFile = null;
}
public void rollback() throws IOException {
@ -174,22 +192,28 @@ public class FileWriter implements Closeable {
flushByTimeImpl();
}
};
timer.schedule(t, flushInterval);
if(timer != null) {
timer.schedule(t, flushInterval);
}
}
}
private void flushByTimeImpl() {
void flushByTimeImpl() {
try {
// Flush time interval gets the write lock so that it won't starve
reentrantReadWriteLock.writeLock().lock();
// Lock before the check so that if a writing process just flushed this won't ingest empty files
if (currentFile != null && currentFile.rawBytes > 0) {
rotate();
finishFile(true);
}
reentrantReadWriteLock.writeLock().unlock();
resetFlushTimer(false);
} catch (Exception e) {
String fileName = currentFile == null ? "no file created yet" : currentFile.file.getName();
long currentSize = currentFile == null ? 0 : currentFile.rawBytes;
log.error(String.format("Error in flushByTime. Current file: %s, size: %d. ", fileName, currentSize), e);
flushError = String.format("Error in flushByTime. Current file: %s, size: %d. ", fileName, currentSize);
log.error(flushError, e);
}
resetFlushTimer(false);
}
private class CountingOutputStream extends FilterOutputStream {

Просмотреть файл

@ -57,6 +57,15 @@ public class KustoSinkConfig extends AbstractConfig {
private static final String KUSTO_SINK_FLUSH_INTERVAL_MS_DOC = "Kusto sink max staleness in milliseconds (per topic+partition combo).";
private static final String KUSTO_SINK_FLUSH_INTERVAL_MS_DISPLAY = "Maximum Flush Interval";
static final String KUSTO_COMMIT_IMMEDIATLY_CONF = "kusto.sink.commit";
private static final String KUSTO_COMMIT_IMMEDIATLY_DOC = "Whether kafka call to commit offsets will flush and commit the last offsets or only the ingested ones\"";
private static final String KUSTO_COMMIT_IMMEDIATLY_DISPLAY = "kusto.sink.commit";
static final String KUSTO_RETRIES_COUNT_CONF = "kusto.sink.retries";
private static final String KUSTO_RETRIES_COUNT_DOC = "Number of retries on ingestions before throwing";
private static final String KUSTO_RETRIES_COUNT_DISPLAY = "kusto.sink.retries";
// Deprecated configs
static final String KUSTO_TABLES_MAPPING_CONF_DEPRECATED = "kusto.tables.topics_mapping";
static final String KUSTO_SINK_FLUSH_SIZE_BYTES_CONF_DEPRECATED = "kusto.sink.flush_size";
@ -64,7 +73,6 @@ public class KustoSinkConfig extends AbstractConfig {
private static final String DEPRECATED_CONFIG_DOC = "This configuration has been deprecated.";
public KustoSinkConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
}
@ -216,9 +224,28 @@ public class KustoSinkConfig extends AbstractConfig {
writeGroupName,
writeGroupOrder++,
Width.MEDIUM,
KUSTO_SINK_FLUSH_INTERVAL_MS_DISPLAY);
}
KUSTO_SINK_FLUSH_INTERVAL_MS_DISPLAY)
.define(
KUSTO_COMMIT_IMMEDIATLY_CONF,
Type.BOOLEAN,
false,
Importance.LOW,
KUSTO_COMMIT_IMMEDIATLY_DOC,
writeGroupName,
writeGroupOrder++,
Width.MEDIUM,
KUSTO_COMMIT_IMMEDIATLY_DISPLAY)
.define(
KUSTO_RETRIES_COUNT_CONF,
Type.INT,
2,
Importance.LOW,
KUSTO_RETRIES_COUNT_DOC,
writeGroupName,
writeGroupOrder++,
Width.MEDIUM,
KUSTO_RETRIES_COUNT_DISPLAY);
}
public String getKustoUrl() {
return this.getString(KUSTO_URL_CONF);
@ -268,9 +295,18 @@ public class KustoSinkConfig extends AbstractConfig {
? getLong(KUSTO_SINK_FLUSH_INTERVAL_MS_CONF_DEPRECATED)
: getLong(KUSTO_SINK_FLUSH_INTERVAL_MS_CONF);
}
public boolean getKustoCommitImmediatly() {
return this.getBoolean(KUSTO_COMMIT_IMMEDIATLY_CONF);
}
public int getKustoRetriesCount() {
return this.getInt(KUSTO_RETRIES_COUNT_CONF);
}
public static void main(String[] args) {
System.out.println(getConfig().toEnrichedRst());
}
}

Просмотреть файл

@ -34,6 +34,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Kusto sink uses file system to buffer records.
@ -51,6 +56,8 @@ public class KustoSinkTask extends SinkTask {
private long maxFileSize;
private long flushInterval;
private String tempDir;
private boolean commitImmediately;
private int retiresCount;
IngestClient kustoIngestClient;
Map<TopicPartition, TopicPartitionWriter> writers;
@ -153,6 +160,13 @@ public class KustoSinkTask extends SinkTask {
IngestionProperties props = new IngestionProperties(db, table);
if (format != null && !format.isEmpty()) {
if (format.equals("json") || format.equals("singlejson") || format.equalsIgnoreCase("multijson")) {
props.setDataFormat("multijson");
}
props.setDataFormat(format);
}
if (format != null && !format.isEmpty()) {
if (format.equals("json") || format.equals("singlejson")){
props.setDataFormat("multijson");
@ -285,7 +299,7 @@ public class KustoSinkTask extends SinkTask {
if (ingestionProps == null) {
throw new ConnectException(String.format("Kusto Sink has no ingestion props mapped for the topic: %s. please check your configuration.", tp.topic()));
} else {
TopicPartitionWriter writer = new TopicPartitionWriter(tp, kustoIngestClient, ingestionProps, tempDir, maxFileSize, flushInterval);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, kustoIngestClient, ingestionProps, tempDir, maxFileSize, flushInterval, commitImmediately, retiresCount);
writer.open();
writers.put(tp, writer);
@ -295,6 +309,7 @@ public class KustoSinkTask extends SinkTask {
@Override
public void close(Collection<TopicPartition> partitions) {
log.warn("KustoConnector got a request to close sink task");
for (TopicPartition tp : partitions) {
try {
writers.get(tp).close();
@ -309,15 +324,23 @@ public class KustoSinkTask extends SinkTask {
@Override
public void start(Map<String, String> props) {
KustoSinkConfig config = new KustoSinkConfig(props);
String url = config.getKustoUrl();
validateTableMappings(config);
topicsToIngestionProps = getTopicsToIngestionProps(config);
// this should be read properly from settings
kustoIngestClient = createKustoIngestClient(config);
tempDir = config.getTempDirPath();
maxFileSize = config.getFlushSizeBytes();
flushInterval = config.getFlushInterval();
commitImmediately = config.getKustoCommitImmediatly();
retiresCount = config.getKustoRetriesCount();
log.info(String.format("Started KustoSinkTask with target cluster: (%s), source topics: (%s)",
url, topicsToIngestionProps.keySet().toString()));
open(context.assignment());
@ -340,9 +363,16 @@ public class KustoSinkTask extends SinkTask {
@Override
public void put(Collection<SinkRecord> records) throws ConnectException {
log.debug("put '"+ records.size() + "' num of records");
int i = 0;
SinkRecord lastRecord = null;
for (SinkRecord record : records) {
log.debug("record to topic:" + record.topic());
if (i == 0) {
log.debug("First record is to topic:" + record.topic());
log.debug("First record offset:" + record.kafkaOffset());
}
lastRecord = record;
i++;
TopicPartition tp = new TopicPartition(record.topic(), record.kafkaPartition());
TopicPartitionWriter writer = writers.get(tp);
@ -354,6 +384,11 @@ public class KustoSinkTask extends SinkTask {
writer.writeRecord(record);
}
if (lastRecord != null) {
log.debug("Last record was for topic:" + lastRecord.topic());
log.debug("Last record had offset:" + lastRecord.kafkaOffset());
}
}
// This is a neat trick, since our rolling files commit whenever they like, offsets may drift
@ -362,23 +397,38 @@ public class KustoSinkTask extends SinkTask {
public Map<TopicPartition, OffsetAndMetadata> preCommit(
Map<TopicPartition, OffsetAndMetadata> offsets
) {
log.info("preCommit called and sink is configured to " + (commitImmediately ? "flush and commit immediatly": "commit only successfully sent for ingestion offsets"));
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition tp : assignment) {
if (commitImmediately) {
CompletableFuture[] tasks = new CompletableFuture[assignment.size()];
int i = 0;
for (TopicPartition tp : assignment) {
TopicPartitionWriter topicPartitionWriter = writers.get(tp);
tasks[i] = (CompletableFuture.runAsync(() -> topicPartitionWriter.fileWriter.flushByTimeImpl()));
}
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(tasks);
try {
voidCompletableFuture.get(4L, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn("failed to flush some of the partition writers in 4 seconds before comitting");
}
} else {
for (TopicPartition tp : assignment) {
Long offset = writers.get(tp).lastCommittedOffset;
Long offset = writers.get(tp).lastCommittedOffset;
if (offset != null) {
log.debug("Forwarding to framework request to commit offset: {} for {}", offset, tp);
offsetsToCommit.put(tp, new OffsetAndMetadata(offset));
if (offset != null) {
log.debug("Forwarding to framework request to commit offset: {} for {} while the offset is {}", offset, tp, offsets.get(tp));
offsetsToCommit.put(tp, new OffsetAndMetadata(offset));
}
}
}
return offsetsToCommit;
return commitImmediately ? offsets : offsetsToCommit;
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) throws ConnectException {
// do nothing , rolling files can handle writing
}
}

Просмотреть файл

@ -7,15 +7,19 @@ import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TopicPartitionWriter {
class TopicPartitionWriter {
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
private final CompressionType eventDataCompression;
private final TopicPartition tp;
@ -23,38 +27,69 @@ public class TopicPartitionWriter {
private final IngestionProperties ingestionProps;
private final String basePath;
private final long flushInterval;
private boolean commitImmediately;
private final long fileThreshold;
FileWriter fileWriter;
long currentOffset;
Long lastCommittedOffset;
private int defaultRetriesCount;
private int currentRetries;
private ReentrantReadWriteLock reentrantReadWriteLock;
TopicPartitionWriter(TopicPartition tp, IngestClient client, TopicIngestionProperties ingestionProps, String basePath, long fileThreshold, long flushInterval) {
TopicPartitionWriter(TopicPartition tp, IngestClient client, TopicIngestionProperties ingestionProps, String basePath,
long fileThreshold, long flushInterval, boolean commitImmediatly, int retriesCount) {
this.tp = tp;
this.client = client;
this.ingestionProps = ingestionProps.ingestionProperties;
this.fileThreshold = fileThreshold;
this.basePath = basePath;
this.flushInterval = flushInterval;
this.commitImmediately = commitImmediatly;
this.currentOffset = 0;
this.eventDataCompression = ingestionProps.eventDataCompression;
this.defaultRetriesCount = retriesCount;
this.currentRetries = retriesCount;
this.reentrantReadWriteLock = new ReentrantReadWriteLock(true);
}
public void handleRollFile(SourceFile fileDescriptor) {
String handleRollFile(SourceFile fileDescriptor) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(fileDescriptor.path, fileDescriptor.rawBytes);
new ArrayList<>();
try {
client.ingestFromFile(fileSourceInfo, ingestionProps);
log.info(String.format("Kusto ingestion: file (%s) of size (%s) at current offset (%s)", fileDescriptor.path, fileDescriptor.rawBytes, currentOffset));
this.lastCommittedOffset = currentOffset;
currentRetries = defaultRetriesCount;
} catch (Exception e) {
log.error("Ingestion Failed for file : "+ fileDescriptor.file.getName() + ", message: " + e.getMessage() + "\nException : " + ExceptionUtils.getStackTrace(e));
if (commitImmediately) {
if (currentRetries > 0) {
try {
// Default time for commit is 5 seconds timeout.
Thread.sleep(1500);
} catch (InterruptedException e1) {
log.error("Couldn't sleep !");
}
log.error("Ingestion Failed for file : " + fileDescriptor.file.getName() + ", defaultRetriesCount left '" + defaultRetriesCount + "'. message: " + e.getMessage() + "\nException : " + ExceptionUtils.getStackTrace(e));
currentRetries--;
return handleRollFile(fileDescriptor);
} else {
currentRetries = defaultRetriesCount;
// Returning string will make the caller throw
return "Ingestion Failed for file : " + fileDescriptor.file.getName() + ", defaultRetriesCount left '" + defaultRetriesCount + "'. message: " + e.getMessage() + "\nException : " + ExceptionUtils.getStackTrace(e);
}
}
}
return null;
}
public String getFilePath() {
long nextOffset = fileWriter != null && fileWriter.isDirty() ? currentOffset + 1 : currentOffset;
String getFilePath(@Nullable Long offset) {
// Should be null if flushed by interval
offset = offset == null ? currentOffset : offset;
long nextOffset = fileWriter != null && fileWriter.isDirty() ? offset + 1 : offset;
String compressionExtension = "";
if (shouldCompressData(ingestionProps, null) || eventDataCompression != null) {
@ -68,7 +103,7 @@ public class TopicPartitionWriter {
return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat(), compressionExtension)).toString();
}
public void writeRecord(SinkRecord record) {
void writeRecord(SinkRecord record) throws ConnectException {
byte[] value = null;
// TODO: should probably refactor this code out into a value transformer
@ -86,20 +121,30 @@ public class TopicPartitionWriter {
} else {
log.error(String.format("Unexpected value type, skipping record %s", record));
}
if (value == null) {
this.currentOffset = record.kafkaOffset();
} else {
try {
reentrantReadWriteLock.readLock().lock();
// Current offset is saved after flushing for the flush timer to use
fileWriter.write(value, record.kafkaOffset());
this.currentOffset = record.kafkaOffset();
fileWriter.write(value);
} catch (IOException e) {
log.error("File write failed", e);
} catch (ConnectException ex) {
if (commitImmediately) {
throw ex;
}
} catch (IOException ex) {
if (commitImmediately) {
throw new ConnectException("Got an IOExcption while writing to file with message:" + ex.getMessage());
}
} finally {
reentrantReadWriteLock.readLock().unlock();
}
}
}
public void open() {
void open() {
// Should compress binary files
boolean shouldCompressData = shouldCompressData(this.ingestionProps, this.eventDataCompression);
@ -109,10 +154,11 @@ public class TopicPartitionWriter {
this::handleRollFile,
this::getFilePath,
!shouldCompressData ? 0 : flushInterval,
shouldCompressData);
shouldCompressData,
reentrantReadWriteLock);
}
public void close() {
void close() {
try {
fileWriter.rollback();
// fileWriter.close(); TODO ?

Просмотреть файл

@ -25,6 +25,7 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.logging.Logger;
public class E2ETest {
private static final String testPrefix = "tmpKafkaE2ETest";
@ -35,8 +36,10 @@ public class E2ETest {
private String database = System.getProperty("database");
private String tableBaseName = System.getProperty("table", testPrefix + UUID.randomUUID().toString().replace('-', '_'));
private String basePath = Paths.get("src/test/resources/", "testE2E").toString();
private Logger log = Logger.getLogger(this.getClass().getName());
@Test
@Ignore
public void testE2ECsv() throws URISyntaxException, DataClientException, DataServiceException {
String table = tableBaseName + "csv";
ConnectionStringBuilder engineCsb = ConnectionStringBuilder.createWithAadApplicationCredentials(String.format("https://%s.kusto.windows.net", cluster), appId, appKey, authority);
@ -67,7 +70,7 @@ public class E2ETest {
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
props.ingestionProperties.setIngestionMapping("mappy", IngestionMapping.IngestionMappingKind.csv);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, ingestClient, props, Paths.get(basePath, "csv").toString(), fileThreshold, flushInterval);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, ingestClient, props, Paths.get(basePath, "csv").toString(), fileThreshold, flushInterval, false, 0);
writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();
@ -115,7 +118,7 @@ public class E2ETest {
props2.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.avro);
props2.ingestionProperties.setIngestionMapping("avri", IngestionMapping.IngestionMappingKind.avro);
TopicPartition tp2 = new TopicPartition("testPartition2", 11);
TopicPartitionWriter writer2 = new TopicPartitionWriter(tp2, ingestClient, props2, Paths.get(basePath, "avro").toString(), 10, 300000);
TopicPartitionWriter writer2 = new TopicPartitionWriter(tp2, ingestClient, props2, Paths.get(basePath, "avro").toString(), 10, 300000, false, 0);
writer2.open();
List<SinkRecord> records2 = new ArrayList<SinkRecord>();
@ -155,5 +158,6 @@ public class E2ETest {
timeElapsedMs += sleepPeriodMs;
}
Assertions.assertEquals(res.getValues().get(0).get(0), expectedNumberOfRows.toString());
this.log.info("Succesfully ingested " + expectedNumberOfRows + " records.");
}
}

Просмотреть файл

@ -1,5 +1,6 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.google.common.base.Function;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.junit.After;
@ -12,8 +13,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@ -52,13 +52,13 @@ public class FileWriterTest {
final String FILE_PATH = Paths.get(path, "ABC").toString();
final int MAX_FILE_SIZE = 128;
Consumer<SourceFile> trackFiles = (SourceFile f) -> {};
Function<SourceFile, String> trackFiles = (SourceFile f) -> null;
Supplier<String> generateFileName = () -> FILE_PATH;
Function<Long, String> generateFileName = (Long l) -> FILE_PATH;
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock());
fileWriter.openFile();
fileWriter.openFile(null);
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 1);
Assert.assertEquals(fileWriter.currentFile.rawBytes, 0);
@ -82,15 +82,15 @@ public class FileWriterTest {
final int MAX_FILE_SIZE = 100;
Consumer<SourceFile> trackFiles = (SourceFile f) -> files.put(f.path, f.rawBytes);
Function<SourceFile, String> trackFiles = (SourceFile f) -> { files.put(f.path, f.rawBytes); return null;};
Supplier<String> generateFileName = () -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString() + "csv.gz";
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString() + "csv.gz";
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock());
for (int i = 0; i < 9; i++) {
String msg = String.format("Line number %d : This is a message from the other size", i);
fileWriter.write(msg.getBytes(StandardCharsets.UTF_8));
fileWriter.write(msg.getBytes(StandardCharsets.UTF_8), null);
}
Assert.assertEquals(files.size(), 4);
@ -121,15 +121,15 @@ public class FileWriterTest {
final int MAX_FILE_SIZE = 128 * 2;
Consumer<SourceFile> trackFiles = (SourceFile f) -> files.put(f.path, f.rawBytes);
Function<SourceFile, String> trackFiles = (SourceFile f) -> {files.put(f.path, f.rawBytes);return null;};
Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + "csv.gz";
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + "csv.gz";
// Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock());
String msg = "Message";
fileWriter.write(msg.getBytes(StandardCharsets.UTF_8));
fileWriter.write(msg.getBytes(StandardCharsets.UTF_8), null);
Thread.sleep(1000);
Assert.assertEquals(files.size(), 0);
@ -141,18 +141,17 @@ public class FileWriterTest {
mkdirs = folder2.mkdirs();
Assert.assertTrue(mkdirs);
Supplier<String> generateFileName2 = () -> Paths.get(path2, java.util.UUID.randomUUID().toString()).toString();
Function<Long, String> generateFileName2 = (Long l) -> Paths.get(path2, java.util.UUID.randomUUID().toString()).toString();
// Expect one file to be ingested as flushInterval had changed
FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, false);
FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, false, new ReentrantReadWriteLock());
String msg2 = "Second Message";
fileWriter2.write(msg2.getBytes(StandardCharsets.UTF_8));
fileWriter2.write(msg2.getBytes(StandardCharsets.UTF_8), null);
Thread.sleep(1010);
Assert.assertEquals(files.size(), 2);
List<Long> sortedFiles = new ArrayList<>(files.values());
sortedFiles.sort((Long x, Long y) -> (int) (y - x));
Assert.assertEquals(sortedFiles, Arrays.asList((long) 14, (long) 7));
@ -162,6 +161,74 @@ public class FileWriterTest {
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
}
public @Test void offsetCheckByInterval() throws InterruptedException, IOException {
// This test will check that lastCommitOffset is set to the right value, when ingests are done by flush interval.
// There will be a write operation followed by a flush which will track files and sleep.
// While it sleeps there will be another write attempt which should wait on the lock and another flush later.
// Resulting in first record to be with offset 1 and second with offset 2.
ArrayList<Map.Entry<String, Long>> files = new ArrayList<>();
final int MAX_FILE_SIZE = 128 * 2;
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
final ArrayList<Long> committedOffsets = new ArrayList<>();
class Offsets {
private long currentOffset = 0;
}
final Offsets offsets = new Offsets();
Function<SourceFile, String> trackFiles = (SourceFile f) -> {
committedOffsets.add(offsets.currentOffset);
files.add(new AbstractMap.SimpleEntry<>(f.path, f.rawBytes));
return null;
};
String path = Paths.get(currentDirectory.getPath(), "offsetCheckByInterval").toString();
File folder = new File(path);
boolean mkdirs = folder.mkdirs();
Assert.assertTrue(mkdirs);
Function<Long, String> generateFileName = (Long offset) -> {
if(offset == null){
offset = offsets.currentOffset;
}
return Paths.get(path, Long.toString(offset)).toString();
};
FileWriter fileWriter2 = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 500, false, reentrantReadWriteLock);
String msg2 = "Second Message";
reentrantReadWriteLock.readLock().lock();
long recordOffset = 1;
fileWriter2.write(msg2.getBytes(StandardCharsets.UTF_8), recordOffset);
offsets.currentOffset = recordOffset;
// Wake the flush by interval in the middle of the writing
Thread.sleep(510);
recordOffset = 2;
fileWriter2.write(msg2.getBytes(StandardCharsets.UTF_8), recordOffset);
offsets.currentOffset = recordOffset;
reentrantReadWriteLock.readLock().unlock();
// Context switch
Thread.sleep(10);
reentrantReadWriteLock.readLock().lock();
recordOffset = 3;
offsets.currentOffset = recordOffset;
fileWriter2.write(msg2.getBytes(StandardCharsets.UTF_8), recordOffset);
reentrantReadWriteLock.readLock().unlock();
Thread.sleep(510);
// Assertions
System.out.println(files.size());
Assert.assertEquals(files.size(), 2);
// Make sure that the first file is from offset 1 till 2 and second is from 3 till 3
Assert.assertEquals(files.stream().map(Map.Entry::getValue).toArray(Long[]::new), new Long[]{28L, 14L});
Assert.assertEquals(files.stream().map((s)->s.getKey().substring(path.length() + 1)).toArray(String[]::new), new String[]{"1", "3"});
Assert.assertEquals(committedOffsets, new ArrayList<Long>(){{add(2L);add(3L);}});
// make sure folder is clear once done
fileWriter2.close();
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
}
@Test
public void testFileWriterCompressed() throws IOException {
String path = Paths.get(currentDirectory.getPath(), "testGzipFileWriter2").toString();
@ -177,22 +244,22 @@ public class FileWriterTest {
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
String msg = "Message";
Consumer<SourceFile> trackFiles = getAssertFileConsumer(msg);
Function<SourceFile, String> trackFiles = getAssertFileConsumer(msg);
Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";
// Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, false);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, false, new ReentrantReadWriteLock());
gzipOutputStream.write(msg.getBytes());
gzipOutputStream.finish();
fileWriter.write(byteArrayOutputStream.toByteArray());
fileWriter.write(byteArrayOutputStream.toByteArray(), null);
fileWriter.close();
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 1);
}
static Consumer<SourceFile> getAssertFileConsumer(String msg) {
static Function<SourceFile, String> getAssertFileConsumer(String msg) {
return (SourceFile f) -> {
try (FileInputStream fileInputStream = new FileInputStream(f.file)) {
byte[] bytes = IOUtils.toByteArray(fileInputStream);
@ -217,6 +284,7 @@ public class FileWriterTest {
e.printStackTrace();
Assert.fail(e.getMessage());
}
return null;
};
}
}

Просмотреть файл

@ -1,5 +1,6 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.google.common.base.Function;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
@ -59,7 +60,7 @@ public class TopicPartitionWriterTest {
IngestionProperties ingestionProperties = new IngestionProperties(db, table);
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = ingestionProperties;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, basePath, fileThreshold, flushInterval);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, basePath, fileThreshold, flushInterval, false, 0);
SourceFile descriptor = new SourceFile();
descriptor.rawBytes = 1024;
@ -94,9 +95,9 @@ public class TopicPartitionWriterTest {
props.ingestionProperties = new IngestionProperties(db, table);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval, false, 0);
Assert.assertEquals(writer.getFilePath(), Paths.get(basePath, "kafka_testTopic_11_0.csv.gz").toString());
Assert.assertEquals(writer.getFilePath(null), Paths.get(basePath, "kafka_testTopic_11_0.csv.gz").toString());
}
@Test
@ -111,7 +112,7 @@ public class TopicPartitionWriterTest {
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = new IngestionProperties(db, table);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval, false, 0);
writer.open();
List<SinkRecord> records = new ArrayList<>();
@ -122,7 +123,7 @@ public class TopicPartitionWriterTest {
writer.writeRecord(record);
}
Assert.assertEquals(writer.getFilePath(), Paths.get(basePath, "kafka_testTopic_11_5.csv.gz").toString());
Assert.assertEquals(writer.getFilePath(null), Paths.get(basePath, "kafka_testTopic_11_5.csv.gz").toString());
}
@Test
@ -137,7 +138,7 @@ public class TopicPartitionWriterTest {
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = new IngestionProperties(db, table);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval, false, 0);
writer.open();
writer.close();
}
@ -181,7 +182,7 @@ public class TopicPartitionWriterTest {
props.ingestionProperties = new IngestionProperties(db, table);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval, false, 0);
writer.open();
@ -213,7 +214,7 @@ public class TopicPartitionWriterTest {
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = new IngestionProperties(db, table);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval, false, 0);
writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();
@ -235,8 +236,8 @@ public class TopicPartitionWriterTest {
// Read
writer.fileWriter.finishFile(false);
Consumer<SourceFile> assertFileConsumer = FileWriterTest.getAssertFileConsumer(messages[2] + "\n");
assertFileConsumer.accept(writer.fileWriter.currentFile);
Function<SourceFile, String> assertFileConsumer = FileWriterTest.getAssertFileConsumer(messages[2] + "\n");
assertFileConsumer.apply(writer.fileWriter.currentFile);
writer.close();
}
@ -261,7 +262,7 @@ public class TopicPartitionWriterTest {
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = new IngestionProperties(db, table);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.avro);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval, false, 0);
writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();