Merge pull request #6 from SanchayGupta1197/error-handling-dlq-retries-test-fix

fixed test cases
This commit is contained in:
Fahad Sheikh 2020-06-16 15:57:16 +05:30 коммит произвёл GitHub
Родитель 3c2b7e6c56 b0d7d52237
Коммит b777c6847d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 149 добавлений и 51 удалений

Просмотреть файл

@ -71,7 +71,7 @@
<kafka.version>1.0.0</kafka.version>
<json.version>20090211</json.version>
<commonio.version>2.6</commonio.version>
<kusto-sdk.version>2.1.0</kusto-sdk.version>
<kusto-sdk.version>1.4.2</kusto-sdk.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>

Просмотреть файл

@ -3,7 +3,6 @@ package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.Results;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
@ -182,15 +181,15 @@ public class KustoSinkTask extends SinkTask {
if (mappingRef != null && !mappingRef.isEmpty()) {
if (format != null) {
if (format.equals(IngestionProperties.DATA_FORMAT.json.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Json);
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.json);
} else if (format.equals(IngestionProperties.DATA_FORMAT.avro.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Avro);
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.avro);
} else if (format.equals(IngestionProperties.DATA_FORMAT.parquet.toString())) {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Parquet);
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.parquet);
} else if (format.equals(IngestionProperties.DATA_FORMAT.orc.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Orc);
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.orc);
} else {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Csv);
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.csv);
}
}
}

Просмотреть файл

@ -242,7 +242,6 @@ class TopicPartitionWriter {
static boolean shouldCompressData(IngestionProperties ingestionProps, CompressionType eventDataCompression) {
return !(ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.avro.toString())
|| ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.apacheavro.toString())
|| ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.parquet.toString())
|| ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.orc.toString())
|| eventDataCompression != null);

Просмотреть файл

@ -20,7 +20,9 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.logging.Logger;
@ -34,9 +36,11 @@ public class E2ETest {
private String tableBaseName = System.getProperty("table", testPrefix + UUID.randomUUID().toString().replace('-', '_'));
private String basePath = Paths.get("src/test/resources/", "testE2E").toString();
private Logger log = Logger.getLogger(this.getClass().getName());
private static final Boolean COMMIT_IMMEDIATELY = true;
@Test
// @Ignore
@Ignore
public void testE2ECsv() throws URISyntaxException, DataClientException, DataServiceException {
String table = tableBaseName + "csv";
ConnectionStringBuilder engineCsb = ConnectionStringBuilder.createWithAadApplicationCredentials(String.format("https://%s.kusto.windows.net", cluster), appId, appKey, authority);
@ -60,14 +64,20 @@ public class E2ETest {
String[] messages = new String[]{"stringy message,1", "another,2"};
// Expect to finish file after writing forth message cause of fileThreshold
long fileThreshold = messages[0].length() + 1;
long flushInterval = 0;
long fileThreshold = 100;
long flushInterval = 100;
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = ingestionProperties;
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
props.ingestionProperties.setIngestionMapping("mappy", IngestionMapping.IngestionMappingKind.Csv);
props.ingestionProperties.setIngestionMapping("mappy", IngestionMapping.IngestionMappingKind.csv);
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, String.format("https://ingest-%s.kusto.windows.net", cluster));
settings.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, Paths.get(basePath, "csv").toString());
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, ingestClient, props, Paths.get(basePath, "csv").toString(), fileThreshold, flushInterval);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, ingestClient, props, COMMIT_IMMEDIATELY, config);
writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();
@ -90,7 +100,7 @@ public class E2ETest {
}
@Test
// @Ignore
@Ignore
public void testE2EAvro() throws URISyntaxException, DataClientException, DataServiceException {
String table = tableBaseName + "avro";
ConnectionStringBuilder engineCsb = ConnectionStringBuilder.createWithAadApplicationCredentials(String.format("https://%s.kusto.windows.net", cluster), appId, appKey, authority);
@ -113,9 +123,15 @@ public class E2ETest {
TopicIngestionProperties props2 = new TopicIngestionProperties();
props2.ingestionProperties = ingestionProperties;
props2.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.avro);
props2.ingestionProperties.setIngestionMapping("avri", IngestionMapping.IngestionMappingKind.Avro);
props2.ingestionProperties.setIngestionMapping("avri", IngestionMapping.IngestionMappingKind.avro);
TopicPartition tp2 = new TopicPartition("testPartition2", 11);
TopicPartitionWriter writer2 = new TopicPartitionWriter(tp2, ingestClient, props2, Paths.get(basePath, "avro").toString(), 10, 300000);
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, String.format("https://ingest-%s.kusto.windows.net", cluster));
settings.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, Paths.get(basePath, "avro").toString());
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(100));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(300000));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer2 = new TopicPartitionWriter(tp2, ingestClient, props2, COMMIT_IMMEDIATELY, config);
writer2.open();
List<SinkRecord> records2 = new ArrayList<SinkRecord>();
@ -142,21 +158,18 @@ public class E2ETest {
private void validateExpectedResults(Client engineClient, Integer expectedNumberOfRows, String table) throws InterruptedException, DataClientException, DataServiceException {
String query = String.format("%s | count", table);
KustoResultSetTable res = engineClient.execute(database, query).getPrimaryResults();
res.next();
Results res = engineClient.execute(database, query);
Integer timeoutMs = 60 * 6 * 1000;
Integer rowCount = res.getInt(0);
Integer rowCount = 0;
Integer timeElapsedMs = 0;
Integer sleepPeriodMs = 5 * 1000;
while (rowCount < expectedNumberOfRows && timeElapsedMs < timeoutMs) {
Thread.sleep(sleepPeriodMs);
res = engineClient.execute(database, query).getPrimaryResults();
res.next();
rowCount = res.getInt(0);
res = engineClient.execute(database, query);
rowCount = Integer.valueOf(res.getValues().get(0).get(0));
timeElapsedMs += sleepPeriodMs;
}
Assertions.assertEquals(rowCount, expectedNumberOfRows);
this.log.info("Succesfully ingested " + expectedNumberOfRows + " records.");
Assertions.assertEquals(res.getValues().get(0).get(0), expectedNumberOfRows.toString());
}
}

Просмотреть файл

@ -3,6 +3,8 @@ package com.microsoft.azure.kusto.kafka.connect.sink;
import com.google.common.base.Function;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -14,6 +16,7 @@ import java.nio.file.Paths;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@ -42,7 +45,6 @@ public class FileWriterTest {
@Test
public void testOpen() throws IOException {
String path = Paths.get(currentDirectory.getPath(), "testWriterOpen").toString();
File folder = new File(path);
boolean mkdirs = folder.mkdirs();
Assert.assertTrue(mkdirs);
@ -52,7 +54,7 @@ public class FileWriterTest {
final String FILE_PATH = Paths.get(path, "ABC").toString();
final int MAX_FILE_SIZE = 128;
Function<SourceFile, String> trackFiles = (SourceFile f) -> null;
Consumer<SourceFile> trackFiles = (SourceFile f) -> {};
Function<Long, String> generateFileName = (Long l) -> FILE_PATH;
@ -71,7 +73,7 @@ public class FileWriterTest {
@Test
public void testGzipFileWriter() throws IOException {
String path = Paths.get(currentDirectory.getPath(), "testGzipFileWriter").toString();
SinkRecord record = new SinkRecord("TestTopic", 1, null, null, null, "random message", 1);
File folder = new File(path);
boolean mkdirs = folder.mkdirs();
Assert.assertTrue(mkdirs);
@ -82,7 +84,7 @@ public class FileWriterTest {
final int MAX_FILE_SIZE = 100;
Function<SourceFile, String> trackFiles = (SourceFile f) -> { files.put(f.path, f.rawBytes); return null;};
Consumer<SourceFile> trackFiles = (SourceFile f) -> files.put(f.path, f.rawBytes);
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString() + "csv.gz";
@ -90,7 +92,7 @@ public class FileWriterTest {
for (int i = 0; i < 9; i++) {
String msg = String.format("Line number %d : This is a message from the other size", i);
fileWriter.write(msg.getBytes(StandardCharsets.UTF_8), null);
fileWriter.write(msg.getBytes(StandardCharsets.UTF_8), record);
}
Assert.assertEquals(files.size(), 4);
@ -121,7 +123,7 @@ public class FileWriterTest {
final int MAX_FILE_SIZE = 128 * 2;
Function<SourceFile, String> trackFiles = (SourceFile f) -> {files.put(f.path, f.rawBytes);return null;};
Consumer<SourceFile> trackFiles = (SourceFile f) -> files.put(f.path, f.rawBytes);
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + "csv.gz";
@ -129,7 +131,9 @@ public class FileWriterTest {
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock());
String msg = "Message";
fileWriter.write(msg.getBytes(StandardCharsets.UTF_8), null);
SinkRecord record = new SinkRecord("TestTopic", 1, null, null, null, msg, 1);
fileWriter.write(msg.getBytes(StandardCharsets.UTF_8), record);
Thread.sleep(1000);
Assert.assertEquals(files.size(), 0);
@ -146,8 +150,9 @@ public class FileWriterTest {
FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, false, new ReentrantReadWriteLock());
String msg2 = "Second Message";
SinkRecord record2 = new SinkRecord("TestTopic", 1, null, null, null, msg2, 1);
fileWriter2.write(msg2.getBytes(StandardCharsets.UTF_8), null);
fileWriter2.write(msg2.getBytes(StandardCharsets.UTF_8), record2);
Thread.sleep(1010);
Assert.assertEquals(files.size(), 2);
@ -161,7 +166,8 @@ public class FileWriterTest {
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
}
public @Test void offsetCheckByInterval() throws InterruptedException, IOException {
@Test
public void offsetCheckByInterval() throws InterruptedException, IOException {
// This test will check that lastCommitOffset is set to the right value, when ingests are done by flush interval.
// There will be a write operation followed by a flush which will track files and sleep.
// While it sleeps there will be another write attempt which should wait on the lock and another flush later.
@ -175,10 +181,10 @@ public class FileWriterTest {
private long currentOffset = 0;
}
final Offsets offsets = new Offsets();
Function<SourceFile, String> trackFiles = (SourceFile f) -> {
Consumer<SourceFile> trackFiles = (SourceFile f) -> {
committedOffsets.add(offsets.currentOffset);
files.add(new AbstractMap.SimpleEntry<>(f.path, f.rawBytes));
return null;
//return null;
};
String path = Paths.get(currentDirectory.getPath(), "offsetCheckByInterval").toString();
@ -195,13 +201,17 @@ public class FileWriterTest {
String msg2 = "Second Message";
reentrantReadWriteLock.readLock().lock();
long recordOffset = 1;
fileWriter2.write(msg2.getBytes(StandardCharsets.UTF_8), recordOffset);
SinkRecord record = new SinkRecord("TestTopic", 1, null, null, null, msg2, recordOffset);
fileWriter2.write(msg2.getBytes(StandardCharsets.UTF_8), record);
offsets.currentOffset = recordOffset;
// Wake the flush by interval in the middle of the writing
Thread.sleep(510);
recordOffset = 2;
fileWriter2.write(msg2.getBytes(StandardCharsets.UTF_8), recordOffset);
SinkRecord record2 = new SinkRecord("TestTopic", 1, null, null, null, msg2, recordOffset);
fileWriter2.write(msg2.getBytes(StandardCharsets.UTF_8), record2);
offsets.currentOffset = recordOffset;
reentrantReadWriteLock.readLock().unlock();
@ -209,8 +219,10 @@ public class FileWriterTest {
Thread.sleep(10);
reentrantReadWriteLock.readLock().lock();
recordOffset = 3;
SinkRecord record3 = new SinkRecord("TestTopic", 1, null, null, null, msg2, recordOffset);
offsets.currentOffset = recordOffset;
fileWriter2.write(msg2.getBytes(StandardCharsets.UTF_8), recordOffset);
fileWriter2.write(msg2.getBytes(StandardCharsets.UTF_8), record3);
reentrantReadWriteLock.readLock().unlock();
Thread.sleep(510);
@ -231,6 +243,7 @@ public class FileWriterTest {
@Test
public void testFileWriterCompressed() throws IOException {
String path = Paths.get(currentDirectory.getPath(), "testGzipFileWriter2").toString();
SinkRecord record = new SinkRecord("TestTopic", 1, null, null, null, "random message", 1);
File folder = new File(path);
boolean mkdirs = folder.mkdirs();
@ -243,7 +256,7 @@ public class FileWriterTest {
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
String msg = "Message";
Function<SourceFile, String> trackFiles = getAssertFileConsumer(msg);
Consumer<SourceFile> trackFiles = getAssertFileConsumer(msg);
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";
@ -252,13 +265,13 @@ public class FileWriterTest {
gzipOutputStream.write(msg.getBytes());
gzipOutputStream.finish();
fileWriter.write(byteArrayOutputStream.toByteArray(), null);
fileWriter.write(byteArrayOutputStream.toByteArray(), record);
fileWriter.close();
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 1);
}
static Function<SourceFile, String> getAssertFileConsumer(String msg) {
static Function<SourceFile, String> getAssertFileConsumerFunction(String msg) {
return (SourceFile f) -> {
try (FileInputStream fileInputStream = new FileInputStream(f.file)) {
byte[] bytes = IOUtils.toByteArray(fileInputStream);
@ -286,4 +299,32 @@ public class FileWriterTest {
return null;
};
}
static Consumer<SourceFile> getAssertFileConsumer(String msg) {
return (SourceFile f) -> {
try (FileInputStream fileInputStream = new FileInputStream(f.file)) {
byte[] bytes = IOUtils.toByteArray(fileInputStream);
try (ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
GZIPInputStream gzipper = new GZIPInputStream(bin)) {
byte[] buffer = new byte[1024];
ByteArrayOutputStream out = new ByteArrayOutputStream();
int len;
while ((len = gzipper.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
gzipper.close();
out.close();
String s = new String(out.toByteArray());
Assert.assertEquals(s, msg);
}
} catch (IOException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
};
}
}

Просмотреть файл

@ -21,13 +21,17 @@ import java.io.IOException;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.mockito.Mockito.*;
public class TopicPartitionWriterTest {
// TODO: should probably find a better way to mock internal class (FileWriter)...
private File currentDirectory;
private static final String KUSTO_CLUSTER_URL = "https://ingest-cluster.kusto.windows.net";
private static final Boolean COMMIT_IMMEDIATELY = true;
@Before
public final void before() {
@ -59,7 +63,13 @@ public class TopicPartitionWriterTest {
IngestionProperties ingestionProperties = new IngestionProperties(db, table);
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = ingestionProperties;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, basePath, fileThreshold, flushInterval);
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, basePath);
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, COMMIT_IMMEDIATELY, config);
SourceFile descriptor = new SourceFile();
descriptor.rawBytes = 1024;
@ -94,7 +104,13 @@ public class TopicPartitionWriterTest {
props.ingestionProperties = new IngestionProperties(db, table);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, basePath);
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config);
Assert.assertEquals(writer.getFilePath(null), Paths.get(basePath, "kafka_testTopic_11_0.csv.gz").toString());
}
@ -111,7 +127,13 @@ public class TopicPartitionWriterTest {
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = new IngestionProperties(db, table);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, basePath);
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config);
writer.open();
List<SinkRecord> records = new ArrayList<>();
@ -137,7 +159,13 @@ public class TopicPartitionWriterTest {
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = new IngestionProperties(db, table);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, basePath);
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config);
writer.open();
writer.close();
}
@ -181,7 +209,13 @@ public class TopicPartitionWriterTest {
props.ingestionProperties = new IngestionProperties(db, table);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, basePath);
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config);
writer.open();
@ -213,7 +247,13 @@ public class TopicPartitionWriterTest {
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = new IngestionProperties(db, table);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, basePath);
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config);
writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();
@ -231,11 +271,11 @@ public class TopicPartitionWriterTest {
Assert.assertEquals(writer.currentOffset, 16);
String currentFileName = writer.fileWriter.currentFile.path;
Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 16, IngestionProperties.DATA_FORMAT.csv.name())).toString());
Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 15, IngestionProperties.DATA_FORMAT.csv.name())).toString());
// Read
writer.fileWriter.finishFile(false);
Function<SourceFile, String> assertFileConsumer = FileWriterTest.getAssertFileConsumer(messages[2] + "\n");
Function<SourceFile, String> assertFileConsumer = FileWriterTest.getAssertFileConsumerFunction(messages[2] + "\n");
assertFileConsumer.apply(writer.fileWriter.currentFile);
writer.close();
}
@ -261,7 +301,13 @@ public class TopicPartitionWriterTest {
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = new IngestionProperties(db, table);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.avro);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, basePath);
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config);
writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();
@ -275,7 +321,7 @@ public class TopicPartitionWriterTest {
Assert.assertEquals(writer.currentOffset, 10);
String currentFileName = writer.fileWriter.currentFile.path;
Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s", tp.topic(), tp.partition(), 11, IngestionProperties.DATA_FORMAT.avro.name())).toString());
Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s", tp.topic(), tp.partition(), 10, IngestionProperties.DATA_FORMAT.avro.name())).toString());
writer.close();
}
}