diff --git a/pom.xml b/pom.xml index b3e9ee5..0625be0 100644 --- a/pom.xml +++ b/pom.xml @@ -70,9 +70,14 @@ - com.github.Azure - azure-kusto-java - v0.2.0 + com.microsoft.azure.kusto + kusto-data + 1.1.0 + + + com.microsoft.azure.kusto + kusto-ingest + 1.1.0 org.testng diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java index 66f6fa1..3d2dad6 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java @@ -1,7 +1,8 @@ package com.microsoft.azure.kusto.kafka.connect.sink; -import com.microsoft.azure.kusto.data.KustoConnectionStringBuilder; +import com.microsoft.azure.kusto.data.ConnectionStringBuilder; import com.microsoft.azure.kusto.ingest.IngestClient; +import com.microsoft.azure.kusto.ingest.IngestionMapping; import com.microsoft.azure.kusto.ingest.IngestionProperties; import com.microsoft.azure.kusto.ingest.IngestClientFactory; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -18,7 +19,6 @@ import org.slf4j.LoggerFactory; import java.util.*; - /** * Kusto sink uses file system to buffer records. * Every time a file is rolled, we used the kusto client to ingest it. @@ -34,7 +34,6 @@ public class KustoSinkTask extends SinkTask { private long flushInterval; private String tempDir; - public KustoSinkTask() { assignment = new HashSet<>(); writers = new HashMap<>(); @@ -46,7 +45,7 @@ public class KustoSinkTask extends SinkTask { throw new ConfigException("Kusto authentication missing App Key."); } - KustoConnectionStringBuilder kcsb = KustoConnectionStringBuilder.createWithAadApplicationCredentials( + ConnectionStringBuilder kcsb = ConnectionStringBuilder.createWithAadApplicationCredentials( config.getKustoUrl(), config.getKustoAuthAppid(), config.getKustoAuthAppkey(), @@ -61,7 +60,7 @@ public class KustoSinkTask extends SinkTask { throw new ConfigException("Kusto authentication missing Password."); } - return IngestClientFactory.createClient(KustoConnectionStringBuilder.createWithAadUserCredentials( + return IngestClientFactory.createClient(ConnectionStringBuilder.createWithAadUserCredentials( config.getKustoUrl(), config.getKustoAuthUsername(), config.getKustoAuthPassword() @@ -97,9 +96,9 @@ public class KustoSinkTask extends SinkTask { if (mappingRef != null && !mappingRef.isEmpty()) { if (format != null && format.equals("json")) { - props.setJsonMappingName(mappingRef); + props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.json); } else { - props.setCsvMappingName(mappingRef); + props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.csv); } } diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTaskTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTaskTest.java index 7cab7f0..f06a782 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTaskTest.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTaskTest.java @@ -231,11 +231,11 @@ public class KustoSinkTaskTest { // single table mapping should cause all topics to be mapped to a single table Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").getDatabaseName(), "db1"); Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").getTableName(), "table1"); - Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").getAdditionalProperties().get("format"), "csv"); + Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").getDataFormat(), "csv"); Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").getDatabaseName(), "db2"); Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").getTableName(), "table2"); - Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").getAdditionalProperties().get("format"), "json"); - Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").getAdditionalProperties().get("jsonMappingReference"), "Mapping"); + Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").getDataFormat(), "json"); + Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").getIngestionMapping().getIngestionMappingReference(), "Mapping"); Assert.assertEquals(kustoSinkTask.getIngestionProps("topic3"), null); } }