diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java index 4f1a0bf..f2ede72 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java @@ -1,8 +1,8 @@ package com.microsoft.azure.kusto.kafka.connect.sink; -import com.microsoft.azure.kusto.data.ConnectionStringBuilder; import com.microsoft.azure.kusto.data.Client; import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.ConnectionStringBuilder; import com.microsoft.azure.kusto.data.KustoResultSetTable; import com.microsoft.azure.kusto.data.exceptions.DataClientException; import com.microsoft.azure.kusto.data.exceptions.DataServiceException; @@ -25,13 +25,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.net.URISyntaxException; import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.Properties; - +import java.util.*; import java.util.logging.Logger; public class E2ETest { @@ -49,14 +43,14 @@ public class E2ETest { private Producer kafkaProducer; @Before - public void setUp(){ - Properties properties = new Properties(); - properties.put("bootstrap.servers", "localhost:9000"); - properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - kafkaProducer = new KafkaProducer<>(properties); - isDlqEnabled = false; - dlqTopicName = null; + public void setUp() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "localhost:9000"); + properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + kafkaProducer = new KafkaProducer<>(properties); + isDlqEnabled = false; + dlqTopicName = null; } @Test @@ -90,9 +84,10 @@ public class E2ETest { props.ingestionProperties = ingestionProperties; props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv); props.ingestionProperties.setIngestionMapping("mappy", IngestionMapping.IngestionMappingKind.Csv); - String KustoUrl = String.format("https://ingest-%s.kusto.windows.net", cluster); + String kustoDmUrl = String.format("https://ingest-%s.kusto.windows.net", cluster); + String kustoEngineUrl = String.format("https://%s.kusto.windows.net", cluster); String basepath = Paths.get(basePath, "csv").toString(); - Map settings = getKustoConfigs(KustoUrl, basepath, "mappy", fileThreshold, flushInterval); + Map settings = getKustoConfigs(kustoDmUrl, kustoEngineUrl, basepath, "mappy", fileThreshold, flushInterval); KustoSinkConfig config = new KustoSinkConfig(settings); TopicPartitionWriter writer = new TopicPartitionWriter(tp, ingestClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer); writer.open(); @@ -142,12 +137,13 @@ public class E2ETest { props2.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.avro); props2.ingestionProperties.setIngestionMapping("avroMapping", IngestionMapping.IngestionMappingKind.Avro); TopicPartition tp2 = new TopicPartition("testPartition2", 11); - String KustoUrl = String.format("https://ingest-%s.kusto.windows.net", cluster); + String kustoDmUrl = String.format("https://ingest-%s.kusto.windows.net", cluster); + String kustoEngineUrl = String.format("https://%s.kusto.windows.net", cluster); String basepath = Paths.get(basePath, "avro").toString(); long fileThreshold = 100; long flushInterval = 300000; - Map settings = getKustoConfigs(KustoUrl, basepath, "avri", fileThreshold, flushInterval); - KustoSinkConfig config= new KustoSinkConfig(settings); + Map settings = getKustoConfigs(kustoDmUrl, kustoEngineUrl, basepath, "avri", fileThreshold, flushInterval); + KustoSinkConfig config = new KustoSinkConfig(settings); TopicPartitionWriter writer2 = new TopicPartitionWriter(tp2, ingestClient, props2, config, isDlqEnabled, dlqTopicName, kafkaProducer); writer2.open(); List records2 = new ArrayList<>(); @@ -193,10 +189,11 @@ public class E2ETest { this.log.info("Successfully ingested " + expectedNumberOfRows + " records."); } - private Map getKustoConfigs(String clusterUrl, String basePath,String tableMapping, long fileThreshold, - long flushInterval) { + private Map getKustoConfigs(String clusterUrl, String engineUrl, String basePath, String tableMapping, + long fileThreshold, long flushInterval) { Map settings = new HashMap<>(); settings.put(KustoSinkConfig.KUSTO_URL_CONF, clusterUrl); + settings.put(KustoSinkConfig.KUSTO_ENGINE_URL_CONF, engineUrl); settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, tableMapping); settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, appId); settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, appKey); diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriterTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriterTest.java index 4671b9a..9a12ec8 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriterTest.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriterTest.java @@ -58,7 +58,8 @@ public class FileWriterTest { final String FILE_PATH = Paths.get(path, "ABC").toString(); final int MAX_FILE_SIZE = 128; - Consumer trackFiles = (SourceFile f) -> {}; + Consumer trackFiles = (SourceFile f) -> { + }; Function generateFileName = (Long l) -> FILE_PATH; @@ -82,11 +83,9 @@ public class FileWriterTest { File folder = new File(path); boolean mkdirs = folder.mkdirs(); Assert.assertTrue(mkdirs); - Assert.assertEquals(0, Objects.requireNonNull(folder.listFiles()).length); HashMap files = new HashMap<>(); - final int MAX_FILE_SIZE = 100; Consumer trackFiles = (SourceFile f) -> files.put(f.path, f.rawBytes); @@ -279,6 +278,7 @@ public class FileWriterTest { protected Map getProperties() { Map settings = new HashMap<>(); settings.put(KustoSinkConfig.KUSTO_URL_CONF, "xxx"); + settings.put(KustoSinkConfig.KUSTO_ENGINE_URL_CONF, "xxx"); settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping"); settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid"); settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey"); diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConnectorConfigTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConnectorConfigTest.java index 6da5ee8..b0bf9cf 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConnectorConfigTest.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConnectorConfigTest.java @@ -13,8 +13,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; public class KustoSinkConnectorConfigTest { - private static final String ENGINE_URI = "https://cluster_name.kusto.windows.net"; private static final String DM_URI = "https://ingest-cluster_name.kusto.windows.net"; + private static final String ENGINE_URI = "https://cluster_name.kusto.windows.net"; @Test public void shouldAcceptValidConfig() { @@ -42,15 +42,6 @@ public class KustoSinkConnectorConfigTest { new KustoSinkConfig(settings); } - @Test - public void shouldGuessKustoEngineUrlWhenNotGivenPrivateCase() { - HashMap settings = setupConfigs(); - settings.put(KustoSinkConfig.KUSTO_URL_CONF, "https://private-ingest-cluster_name.kusto.windows.net"); - KustoSinkConfig config = new KustoSinkConfig(settings); - String kustoEngineUrl = config.getKustoEngineUrl(); - assertEquals("https://private-cluster_name.kusto.windows.net", kustoEngineUrl); - } - @Test public void shouldUseDmUrlWhenKustoEngineUrlNotGivenAndCantGuess() { HashMap settings = setupConfigs(); @@ -118,6 +109,7 @@ public class KustoSinkConnectorConfigTest { public static HashMap setupConfigs() { HashMap configs = new HashMap<>(); configs.put(KustoSinkConfig.KUSTO_URL_CONF, DM_URI); + configs.put(KustoSinkConfig.KUSTO_ENGINE_URL_CONF, ENGINE_URI); configs.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db2', 'table': 'table2','format': 'json','mapping': 'Mapping'}]"); configs.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid"); configs.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey"); diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriterTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriterTest.java index 9b8f8a8..dbb2357 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriterTest.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriterTest.java @@ -11,10 +11,10 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; -import org.junit.Assert; import java.io.ByteArrayOutputStream; import java.io.File; @@ -22,18 +22,15 @@ import java.io.FileInputStream; import java.io.IOException; import java.nio.file.Paths; import java.time.Instant; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; +import java.util.*; import static org.mockito.Mockito.*; public class TopicPartitionWriterTest { // TODO: should probably find a better way to mock internal class (FileWriter)... private File currentDirectory; - private static final String KUSTO_CLUSTER_URL = "https://ingest-cluster.kusto.windows.net"; + private static final String KUSTO_INGEST_CLUSTER_URL = "https://ingest-cluster.kusto.windows.net"; + private static final String KUSTO_CLUSTER_URL = "https://cluster.kusto.windows.net"; private static final String DATABASE = "testdb1"; private static final String TABLE = "testtable1"; private boolean isDlqEnabled; @@ -76,13 +73,13 @@ public class TopicPartitionWriterTest { TopicIngestionProperties props = new TopicIngestionProperties(); props.ingestionProperties = ingestionProperties; Map settings = getKustoConfigs(basePath, fileThreshold, flushInterval); - KustoSinkConfig config= new KustoSinkConfig(settings); + KustoSinkConfig config = new KustoSinkConfig(settings); TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer); SourceFile descriptor = new SourceFile(); descriptor.rawBytes = 1024; descriptor.path = "somepath/somefile"; - descriptor.file = new File ("C://myfile.txt"); + descriptor.file = new File("C://myfile.txt"); writer.handleRollFile(descriptor); ArgumentCaptor fileSourceInfoArgument = ArgumentCaptor.forClass(FileSourceInfo.class); @@ -96,7 +93,7 @@ public class TopicPartitionWriterTest { Assert.assertEquals(fileSourceInfoArgument.getValue().getFilePath(), descriptor.path); Assert.assertEquals(TABLE, ingestionPropertiesArgumentCaptor.getValue().getTableName()); Assert.assertEquals(DATABASE, ingestionPropertiesArgumentCaptor.getValue().getDatabaseName()); - Assert.assertEquals(fileSourceInfoArgument.getValue().getRawSizeInBytes(), 1024); + Assert.assertEquals(1024, fileSourceInfoArgument.getValue().getRawSizeInBytes()); } @Test @@ -111,7 +108,7 @@ public class TopicPartitionWriterTest { props.ingestionProperties = new IngestionProperties(DATABASE, TABLE); props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv); Map settings = getKustoConfigs(basePath, fileThreshold, flushInterval); - KustoSinkConfig config= new KustoSinkConfig(settings); + KustoSinkConfig config = new KustoSinkConfig(settings); TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer); Assert.assertEquals(writer.getFilePath(null), Paths.get(config.getTempDirPath(), "kafka_testTopic_11_0.csv.gz").toString()); @@ -128,7 +125,7 @@ public class TopicPartitionWriterTest { props.ingestionProperties = new IngestionProperties(DATABASE, TABLE); props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv); Map settings = getKustoConfigs(basePath, fileThreshold, flushInterval); - KustoSinkConfig config= new KustoSinkConfig(settings); + KustoSinkConfig config = new KustoSinkConfig(settings); TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer); writer.open(); List records = new ArrayList<>(); @@ -155,7 +152,7 @@ public class TopicPartitionWriterTest { props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv); Map settings = getKustoConfigs(basePath, fileThreshold, flushInterval); - KustoSinkConfig config= new KustoSinkConfig(settings); + KustoSinkConfig config = new KustoSinkConfig(settings); TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer); writer.open(); writer.close(); @@ -188,7 +185,7 @@ public class TopicPartitionWriterTest { } @Test - public void testWriteStringyValuesAndOffset() throws Exception { + public void testWriteStringyValuesAndOffset() { TopicPartition tp = new TopicPartition("testTopic", 2); IngestClient mockClient = mock(IngestClient.class); String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString(); @@ -199,11 +196,11 @@ public class TopicPartitionWriterTest { props.ingestionProperties = new IngestionProperties(DATABASE, TABLE); props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv); Map settings = getKustoConfigs(basePath, fileThreshold, flushInterval); - KustoSinkConfig config= new KustoSinkConfig(settings); + KustoSinkConfig config = new KustoSinkConfig(settings); TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer); writer.open(); - List records = new ArrayList(); + List records = new ArrayList<>(); records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.STRING_SCHEMA, "another,stringy,message", 3)); records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.STRING_SCHEMA, "{'also':'stringy','sortof':'message'}", 4)); @@ -221,7 +218,7 @@ public class TopicPartitionWriterTest { TopicPartition tp = new TopicPartition("testPartition", 11); IngestClient mockClient = mock(IngestClient.class); String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString(); - String[] messages = new String[]{ "stringy message", "another,stringy,message", "{'also':'stringy','sortof':'message'}"}; + String[] messages = new String[]{"stringy message", "another,stringy,message", "{'also':'stringy','sortof':'message'}"}; // Expect to finish file after writing forth message cause of fileThreshold long fileThreshold = messages[0].length() + messages[1].length() + messages[2].length() + messages[2].length() - 1; @@ -230,11 +227,11 @@ public class TopicPartitionWriterTest { props.ingestionProperties = new IngestionProperties(DATABASE, TABLE); props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv); Map settings = getKustoConfigs(basePath, fileThreshold, flushInterval); - KustoSinkConfig config= new KustoSinkConfig(settings); + KustoSinkConfig config = new KustoSinkConfig(settings); TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer); writer.open(); - List records = new ArrayList(); + List records = new ArrayList<>(); records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.STRING_SCHEMA, messages[0], 10)); records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.STRING_SCHEMA, messages[1], 13)); records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.STRING_SCHEMA, messages[2], 14)); @@ -245,8 +242,8 @@ public class TopicPartitionWriterTest { writer.writeRecord(record); } - Assert.assertEquals((long) writer.lastCommittedOffset, (long) 15); - Assert.assertEquals(writer.currentOffset, 16); + Assert.assertEquals(15, (long) writer.lastCommittedOffset); + Assert.assertEquals(16, writer.currentOffset); String currentFileName = writer.fileWriter.currentFile.path; Assert.assertEquals(currentFileName, Paths.get(config.getTempDirPath(), String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 15, IngestionProperties.DATA_FORMAT.csv.name())).toString()); @@ -277,19 +274,19 @@ public class TopicPartitionWriterTest { props.ingestionProperties = new IngestionProperties(DATABASE, TABLE); props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.avro); Map settings = getKustoConfigs(basePath, fileThreshold, flushInterval); - KustoSinkConfig config= new KustoSinkConfig(settings); + KustoSinkConfig config = new KustoSinkConfig(settings); TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer); writer.open(); - List records = new ArrayList(); + List records = new ArrayList<>(); records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.BYTES_SCHEMA, o.toByteArray(), 10)); for (SinkRecord record : records) { writer.writeRecord(record); } - Assert.assertEquals((long) writer.lastCommittedOffset, (long) 10); - Assert.assertEquals(writer.currentOffset, 10); + Assert.assertEquals(10, (long) writer.lastCommittedOffset); + Assert.assertEquals(10, writer.currentOffset); String currentFileName = writer.fileWriter.currentFile.path; @@ -300,7 +297,8 @@ public class TopicPartitionWriterTest { private Map getKustoConfigs(String basePath, long fileThreshold, long flushInterval) { Map settings = new HashMap<>(); - settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL); + settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_INGEST_CLUSTER_URL); + settings.put(KustoSinkConfig.KUSTO_ENGINE_URL_CONF, KUSTO_CLUSTER_URL); settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping"); settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid"); settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");