From f73579d75c780820441e458f1165dd77a700b92a Mon Sep 17 00:00:00 2001 From: fahad Date: Sun, 21 Jun 2020 21:50:05 +0530 Subject: [PATCH] Fixed unit and E2E tests --- .../kusto/kafka/connect/sink/KustoSinkConfig.java | 12 +++++++++--- .../azure/kusto/kafka/connect/sink/E2ETest.java | 4 ++-- .../kusto/kafka/connect/sink/FileWriterTest.java | 14 ++++++++------ .../connect/sink/KustoSinkConnectorConfigTest.java | 10 ++++++---- .../connect/sink/TopicPartitionWriterTest.java | 14 +++++++------- 5 files changed, 32 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConfig.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConfig.java index 058ae77..2b42008 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConfig.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConfig.java @@ -6,6 +6,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; +import org.apache.kafka.common.config.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.util.Strings; @@ -129,6 +130,11 @@ public class KustoSinkConfig extends AbstractConfig { public KustoSinkConfig(Map parsedConfig) { this(getConfig(), parsedConfig); + + if (Strings.isNullOrEmpty(getTopicToTableMapping())) { + throw new ConfigException("Missing 'kusto.tables.topics.mapping' configuration, " + + "please configure it to appropriate value."); + } } public static ConfigDef getConfig() { @@ -147,7 +153,7 @@ public class KustoSinkConfig extends AbstractConfig { defineConnectionConfigs(connectionGroupName, connectionGroupOrder, result); defineWriteConfigs(writeGroupName, writeGroupOrder, result); defineErrorHandlingAndRetriesConfgis(errorAndRetriesGroupName, errorAndRetriesGroupOrder, result); - + return result; } @@ -215,7 +221,7 @@ public class KustoSinkConfig extends AbstractConfig { .define( KUSTO_TABLES_MAPPING_CONF, Type.STRING, - ConfigDef.NO_DEFAULT_VALUE, + "", Importance.HIGH, KUSTO_TABLES_MAPPING_DOC, writeGroupName, @@ -225,7 +231,7 @@ public class KustoSinkConfig extends AbstractConfig { .define( KUSTO_TABLES_MAPPING_CONF_DEPRECATED, Type.STRING, - ConfigDef.NO_DEFAULT_VALUE, + "", Importance.HIGH, KUSTO_TABLES_MAPPING_DOC + DEPRECATED_CONFIG_DOC, writeGroupName, diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java index 9e62639..ca91b57 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java @@ -77,7 +77,7 @@ public class E2ETest { settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval)); KustoSinkConfig config= new KustoSinkConfig(settings); - TopicPartitionWriter writer = new TopicPartitionWriter(tp, ingestClient, props, COMMIT_IMMEDIATELY, config); + TopicPartitionWriter writer = new TopicPartitionWriter(tp, ingestClient, props, config); writer.open(); List records = new ArrayList(); @@ -131,7 +131,7 @@ public class E2ETest { settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(100)); settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(300000)); KustoSinkConfig config= new KustoSinkConfig(settings); - TopicPartitionWriter writer2 = new TopicPartitionWriter(tp2, ingestClient, props2, COMMIT_IMMEDIATELY, config); + TopicPartitionWriter writer2 = new TopicPartitionWriter(tp2, ingestClient, props2, config); writer2.open(); List records2 = new ArrayList(); diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriterTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriterTest.java index decf2ad..41213c5 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriterTest.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriterTest.java @@ -1,6 +1,8 @@ package com.microsoft.azure.kusto.kafka.connect.sink; import com.google.common.base.Function; +import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.BehaviorOnError; + import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.kafka.connect.data.Schema; @@ -58,7 +60,7 @@ public class FileWriterTest { Function generateFileName = (Long l) -> FILE_PATH; - FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock()); + FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(), BehaviorOnError.FAIL); fileWriter.openFile(null); @@ -88,7 +90,7 @@ public class FileWriterTest { Function generateFileName = (Long l) -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString() + "csv.gz"; - FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock()); + FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(), BehaviorOnError.FAIL); for (int i = 0; i < 9; i++) { String msg = String.format("Line number %d : This is a message from the other size", i); @@ -128,7 +130,7 @@ public class FileWriterTest { Function generateFileName = (Long l) -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + "csv.gz"; // Expect no files to be ingested as size is small and flushInterval is big - FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock()); + FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(), BehaviorOnError.FAIL); String msg = "Message"; SinkRecord record = new SinkRecord("TestTopic", 1, null, null, null, msg, 1); @@ -147,7 +149,7 @@ public class FileWriterTest { Function generateFileName2 = (Long l) -> Paths.get(path2, java.util.UUID.randomUUID().toString()).toString(); // Expect one file to be ingested as flushInterval had changed - FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, false, new ReentrantReadWriteLock()); + FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, false, new ReentrantReadWriteLock(), BehaviorOnError.FAIL); String msg2 = "Second Message"; SinkRecord record2 = new SinkRecord("TestTopic", 1, null, null, null, msg2, 1); @@ -197,7 +199,7 @@ public class FileWriterTest { } return Paths.get(path, Long.toString(offset)).toString(); }; - FileWriter fileWriter2 = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 500, false, reentrantReadWriteLock); + FileWriter fileWriter2 = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 500, false, reentrantReadWriteLock, BehaviorOnError.FAIL); String msg2 = "Second Message"; reentrantReadWriteLock.readLock().lock(); long recordOffset = 1; @@ -261,7 +263,7 @@ public class FileWriterTest { Function generateFileName = (Long l) -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz"; // Expect no files to be ingested as size is small and flushInterval is big - FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, false, new ReentrantReadWriteLock()); + FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, false, new ReentrantReadWriteLock(), BehaviorOnError.FAIL); gzipOutputStream.write(msg.getBytes()); gzipOutputStream.finish(); diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConnectorConfigTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConnectorConfigTest.java index a888810..ce8d8a7 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConnectorConfigTest.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConnectorConfigTest.java @@ -30,6 +30,7 @@ public class KustoSinkConnectorConfigTest { public void shouldAcceptValidConfig() { // Adding required Configuration with no default value. settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url"); + settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping"); config = new KustoSinkConfig(settings); assertNotNull(config); } @@ -38,9 +39,9 @@ public class KustoSinkConnectorConfigTest { public void shouldHaveDefaultValues() { // Adding required Configuration with no default value. settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url"); + settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping"); config = new KustoSinkConfig(settings); assertNotNull(config.getKustoUrl()); - assertNull(config.getTopicToTableMapping()); assertNotNull(config.getFlushSizeBytes()); assertNotNull(config.getFlushInterval()); assertFalse(config.isDlqEnabled()); @@ -58,7 +59,7 @@ public class KustoSinkConnectorConfigTest { public void shouldFailWhenErrorToleranceIncorrectlyConfigured() { // Adding required Configuration with no default value. settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url"); - + settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping"); settings.put(KustoSinkConfig.KUSTO_BEHAVIOR_ON_ERROR_CONF, "DummyValue"); config = new KustoSinkConfig(settings); } @@ -66,13 +67,14 @@ public class KustoSinkConnectorConfigTest { @Test public void verifyDlqSettings() { settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url"); + settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping"); settings.put(KustoSinkConfig.KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF, "localhost:8081,localhost:8082"); - //settings.put(KustoSinkConfig.CONNECTOR_NAME_CONF, "KustoConnectorTest"); + settings.put(KustoSinkConfig.KUSTO_DLQ_TOPIC_NAME_CONF, "dlq-error-topic"); config = new KustoSinkConfig(settings); assertTrue(config.isDlqEnabled()); assertEquals(Arrays.asList("localhost:8081", "localhost:8082"), config.getDlqBootstrapServers()); - //assertEquals("KustoConnectorTest-error", config.getDlqTopicName()); + assertEquals("dlq-error-topic", config.getDlqTopicName()); } } diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriterTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriterTest.java index 5d61133..9824c2a 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriterTest.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriterTest.java @@ -69,7 +69,7 @@ public class TopicPartitionWriterTest { settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold)); settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval)); KustoSinkConfig config= new KustoSinkConfig(settings); - TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, COMMIT_IMMEDIATELY, config); + TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, config); SourceFile descriptor = new SourceFile(); descriptor.rawBytes = 1024; @@ -110,7 +110,7 @@ public class TopicPartitionWriterTest { settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold)); settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval)); KustoSinkConfig config= new KustoSinkConfig(settings); - TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config); + TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config); Assert.assertEquals(writer.getFilePath(null), Paths.get(basePath, "kafka_testTopic_11_0.csv.gz").toString()); } @@ -133,7 +133,7 @@ public class TopicPartitionWriterTest { settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold)); settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval)); KustoSinkConfig config= new KustoSinkConfig(settings); - TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config); + TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config); writer.open(); List records = new ArrayList<>(); @@ -165,7 +165,7 @@ public class TopicPartitionWriterTest { settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold)); settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval)); KustoSinkConfig config= new KustoSinkConfig(settings); - TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config); + TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config); writer.open(); writer.close(); } @@ -215,7 +215,7 @@ public class TopicPartitionWriterTest { settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold)); settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval)); KustoSinkConfig config= new KustoSinkConfig(settings); - TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config); + TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config); writer.open(); @@ -253,7 +253,7 @@ public class TopicPartitionWriterTest { settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold)); settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval)); KustoSinkConfig config= new KustoSinkConfig(settings); - TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config); + TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config); writer.open(); List records = new ArrayList(); @@ -307,7 +307,7 @@ public class TopicPartitionWriterTest { settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold)); settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval)); KustoSinkConfig config= new KustoSinkConfig(settings); - TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config); + TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config); writer.open(); List records = new ArrayList();