зеркало из
1
0
Форкнуть 0
This commit is contained in:
Ohad Bitton 2019-10-24 12:57:04 +03:00
Родитель af385348d3
Коммит f022f2cec4
3 изменённых файлов: 17 добавлений и 13 удалений

11
pom.xml
Просмотреть файл

@ -70,9 +70,14 @@
</properties>
<dependencies>
<dependency>
<groupId>com.github.Azure</groupId>
<artifactId>azure-kusto-java</artifactId>
<version>v0.2.0</version>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>kusto-data</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>kusto-ingest</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>

Просмотреть файл

@ -1,7 +1,8 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.data.KustoConnectionStringBuilder;
import com.microsoft.azure.kusto.data.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@ -18,7 +19,6 @@ import org.slf4j.LoggerFactory;
import java.util.*;
/**
* Kusto sink uses file system to buffer records.
* Every time a file is rolled, we used the kusto client to ingest it.
@ -34,7 +34,6 @@ public class KustoSinkTask extends SinkTask {
private long flushInterval;
private String tempDir;
public KustoSinkTask() {
assignment = new HashSet<>();
writers = new HashMap<>();
@ -46,7 +45,7 @@ public class KustoSinkTask extends SinkTask {
throw new ConfigException("Kusto authentication missing App Key.");
}
KustoConnectionStringBuilder kcsb = KustoConnectionStringBuilder.createWithAadApplicationCredentials(
ConnectionStringBuilder kcsb = ConnectionStringBuilder.createWithAadApplicationCredentials(
config.getKustoUrl(),
config.getKustoAuthAppid(),
config.getKustoAuthAppkey(),
@ -61,7 +60,7 @@ public class KustoSinkTask extends SinkTask {
throw new ConfigException("Kusto authentication missing Password.");
}
return IngestClientFactory.createClient(KustoConnectionStringBuilder.createWithAadUserCredentials(
return IngestClientFactory.createClient(ConnectionStringBuilder.createWithAadUserCredentials(
config.getKustoUrl(),
config.getKustoAuthUsername(),
config.getKustoAuthPassword()
@ -97,9 +96,9 @@ public class KustoSinkTask extends SinkTask {
if (mappingRef != null && !mappingRef.isEmpty()) {
if (format != null && format.equals("json")) {
props.setJsonMappingName(mappingRef);
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.json);
} else {
props.setCsvMappingName(mappingRef);
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.csv);
}
}

Просмотреть файл

@ -231,11 +231,11 @@ public class KustoSinkTaskTest {
// single table mapping should cause all topics to be mapped to a single table
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").getDatabaseName(), "db1");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").getTableName(), "table1");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").getAdditionalProperties().get("format"), "csv");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").getDataFormat(), "csv");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").getDatabaseName(), "db2");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").getTableName(), "table2");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").getAdditionalProperties().get("format"), "json");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").getAdditionalProperties().get("jsonMappingReference"), "Mapping");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").getDataFormat(), "json");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").getIngestionMapping().getIngestionMappingReference(), "Mapping");
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic3"), null);
}
}