From 1ddad12e05870453086a1790615582a0d49254be Mon Sep 17 00:00:00 2001 From: Daniel Dubovski Date: Sun, 2 Sep 2018 18:00:41 +0300 Subject: [PATCH] initial draft with todo's to next release --- connect-kusto-sink.properties | 23 ++ pom.xml | 132 ++++++++ .../connect/sink/GZIPFileDescriptor.java | 11 + .../kafka/connect/sink/GZIPFileWriter.java | 132 ++++++++ .../kafka/connect/sink/KustoSinkConfig.java | 105 ++++++ .../connect/sink/KustoSinkConnector.java | 65 ++++ .../kafka/connect/sink/KustoSinkTask.java | 207 ++++++++++++ .../connect/sink/TopicPartitionWriter.java | 94 ++++++ .../kusto/kafka/connect/sink/Version.java | 29 ++ .../sink/client/AadAuthenticationHelper.java | 88 +++++ .../sink/client/AzureStorageHelper.java | 144 +++++++++ .../connect/sink/client/BlobDescription.java | 49 +++ .../sink/client/CslCommandGenerator.java | 19 ++ .../sink/client/IKustoIngestionResult.java | 14 + .../IngestFromMultipleBlobsCallable.java | 79 +++++ .../sink/client/IngestionBlobInfo.java | 30 ++ .../sink/client/IngestionErrorCode.java | 218 +++++++++++++ .../sink/client/IngestionFailureInfo.java | 35 ++ .../connect/sink/client/IngestionStatus.java | 185 +++++++++++ .../IngestionStatusInTableDescription.java | 7 + .../connect/sink/client/KustoClient.java | 42 +++ .../KustoClientAggregateException.java | 15 + .../KustoClientException.java | 20 ++ .../KustoWebException.java | 18 ++ .../client/KustoConnectionStringBuilder.java | 66 ++++ .../sink/client/KustoIngestClient.java | 222 +++++++++++++ .../sink/client/KustoIngestionProperties.java | 113 +++++++ .../connect/sink/client/KustoResults.java | 39 +++ .../connect/sink/client/OperationStatus.java | 39 +++ .../connect/sink/client/ResourceManager.java | 178 +++++++++++ .../kafka/connect/sink/client/Sample.java | 128 ++++++++ .../sink/client/SampleFileIngestion.java | 31 ++ .../TableReportKustoIngestionResult.java | 33 ++ .../kafka/connect/sink/client/Utils.java | 85 +++++ .../azure-kusto-kafka-sink-version.properties | 1 + .../connect/sink/GZIPFileWriterTest.java | 117 +++++++ .../kafka/connect/sink/KustoSinkTaskTest.java | 300 ++++++++++++++++++ .../sink/TopicPartitionWriterTest.java | 210 ++++++++++++ 38 files changed, 3323 insertions(+) create mode 100644 connect-kusto-sink.properties create mode 100644 pom.xml create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/GZIPFileDescriptor.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/GZIPFileWriter.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConfig.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConnector.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/Version.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/AadAuthenticationHelper.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/AzureStorageHelper.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/BlobDescription.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/CslCommandGenerator.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IKustoIngestionResult.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestFromMultipleBlobsCallable.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionBlobInfo.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionErrorCode.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionFailureInfo.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionStatus.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionStatusInTableDescription.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoClient.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoClientExceptions/KustoClientAggregateException.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoClientExceptions/KustoClientException.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoClientExceptions/KustoWebException.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoConnectionStringBuilder.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoIngestClient.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoIngestionProperties.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoResults.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/OperationStatus.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/ResourceManager.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/Sample.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/SampleFileIngestion.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/TableReportKustoIngestionResult.java create mode 100644 src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/Utils.java create mode 100644 src/main/resources/azure-kusto-kafka-sink-version.properties create mode 100644 src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/GZIPFileWriterTest.java create mode 100644 src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTaskTest.java create mode 100644 src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriterTest.java diff --git a/connect-kusto-sink.properties b/connect-kusto-sink.properties new file mode 100644 index 0000000..37fff8b --- /dev/null +++ b/connect-kusto-sink.properties @@ -0,0 +1,23 @@ +connector.class=com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector +# Name of the connector +name=azure-kusto-sink +# Maximum number of tasks that should be created for this connector. +tasks.max=1 +topics=logs +key.ignore=true +schema.ignore=true +# Kafka topics to which the messages should be written to be sent to the Kusto +#kusto.url=https://{cluster_name}.kusto.windows.net +#kusto.db={db_name} +#kusto.table={table_name} +#kusto.tables.topics_mapping=logs:logs_table; +# 10MB +#kusto.sink.flush_size=1024*1024*10 +# 10 Minutes +#kusto.sink.flush_interval_ms=60*10 +#kusto.auth.username={my_username} +#kusto.auth.password={my_password} +# appid/secret +#kusto.auth.appid={my_app_id} +#kusto.auth.appkey={my_app_key} +#kusto.auth.authority= diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..aacb935 --- /dev/null +++ b/pom.xml @@ -0,0 +1,132 @@ + + + 4.0.0 + com.microsoft.azure + azure-kusto-kafka + jar + 0.0.1 + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + src/main/resources + true + + + + + Azure Kusto Kafka Sink + + + + The MIT License (MIT) + http://opensource.org/licenses/MIT + repo + + + + + scm:git:https://github.com/Azure/kafka-sink-azure-kusto + scm:git:git://github.com/Azure/kafka-sink-azure-kusto.git + + + + + microsoft + Microsoft + + + + 1.0.0 + 20090211 + 8.0.0 + 1.6.0 + 2.6 + UTF-8 + + + + org.testng + testng + 6.14.2 + + + org.apache.httpcomponents + httpclient + 4.5.3 + + + + com.microsoft.azure + azure-storage + ${azureStorage.version} + + + com.microsoft.azure + adal4j + ${azureAdal.version} + + + org.apache.kafka + connect-api + ${kafka.version} + + + org.apache.kafka + connect-json + ${kafka.version} + + + org.json + json + ${json.version} + + + commons-io + commons-io + ${commonio.version} + + + junit + junit + 4.4 + test + + + org.apache.httpcomponents + httpclient + 4.5.3 + + + + org.codehaus.jackson + jackson-mapper-asl + 1.9.13 + + + org.mockito + mockito-all + 2.0.2-beta + test + + + org.junit.jupiter + junit-jupiter-api + RELEASE + test + + + \ No newline at end of file diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/GZIPFileDescriptor.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/GZIPFileDescriptor.java new file mode 100644 index 0000000..1c3adee --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/GZIPFileDescriptor.java @@ -0,0 +1,11 @@ +package com.microsoft.azure.kusto.kafka.connect.sink; + +import java.io.File; + +public class GZIPFileDescriptor { + long rawBytes = 0; + long zippedBytes = 0; + long numRecords = 0; + public String path; + public File file; +} \ No newline at end of file diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/GZIPFileWriter.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/GZIPFileWriter.java new file mode 100644 index 0000000..227062f --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/GZIPFileWriter.java @@ -0,0 +1,132 @@ +package com.microsoft.azure.kusto.kafka.connect.sink; + + +import java.io.*; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.zip.GZIPOutputStream; + + +public class GZIPFileWriter implements Closeable { + // callbacks + private Consumer onRollCallback; + private Supplier getFilePath; + + public GZIPFileDescriptor currentFile; + private GZIPOutputStream gzipStream; + private String basePath; + private CountingOutputStream fileStream; + private long fileThreshold; + + public GZIPFileWriter(String basePath, long fileThreshold, + Consumer onRollCallback, Supplier getFilePath) { + this.getFilePath = getFilePath; + this.basePath = basePath; + this.fileThreshold = fileThreshold; + this.onRollCallback = onRollCallback; + } + + public boolean isDirty() { + return this.currentFile != null && this.currentFile.rawBytes > 0; + } + + public synchronized void write(byte[] data) throws IOException { + if (data == null || data.length == 0) return; + + if (currentFile == null) { + openFile(); + } + + if ((currentFile.rawBytes + data.length) > fileThreshold) { + rotate(); + } + + gzipStream.write(data); + + currentFile.rawBytes += data.length; + currentFile.zippedBytes += fileStream.numBytes; + currentFile.numRecords++; + } + + public void openFile() throws IOException { + GZIPFileDescriptor fileDescriptor = new GZIPFileDescriptor(); + + File folder = new File(basePath); + if (!folder.exists() && !folder.mkdirs()) { + throw new IOException(String.format("Failed to create new directory %s", folder.getPath())); + } + + String filePath = getFilePath.get() + ".gz"; + fileDescriptor.path = filePath; + + File file = new File(filePath); + + file.createNewFile(); + FileOutputStream fos = new FileOutputStream(file); + fos.getChannel().truncate(0); + + fileStream = new CountingOutputStream(fos); + gzipStream = new GZIPOutputStream(fileStream); + + fileDescriptor.file = file; + + currentFile = fileDescriptor; + } + + private void rotate() throws IOException { + finishFile(); + openFile(); + } + + private void finishFile() throws IOException { + gzipStream.finish(); + + onRollCallback.accept(currentFile); + // closing late so that the success callback will have a chance to use the file. + gzipStream.close(); + currentFile.file.delete(); + } + + public void rollback() throws IOException { + if (gzipStream != null) { + gzipStream.close(); + if (currentFile != null && currentFile.file != null) { + currentFile.file.delete(); + } + } + } + + public void close() throws IOException { + // Flush last file, updating index + finishFile(); + gzipStream.close(); + } + + + private class CountingOutputStream extends FilterOutputStream { + private long numBytes = 0; + + CountingOutputStream(OutputStream out) throws IOException { + super(out); + } + + @Override + public void write(int b) throws IOException { + out.write(b); + this.numBytes++; + } + + @Override + public void write(byte[] b) throws IOException { + out.write(b); + this.numBytes += b.length; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + this.numBytes += len; + } + } +} + diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConfig.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConfig.java new file mode 100644 index 0000000..bae0e18 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConfig.java @@ -0,0 +1,105 @@ +package com.microsoft.azure.kusto.kafka.connect.sink; + + +import org.apache.commons.io.FileUtils; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + + +public class KustoSinkConfig extends AbstractConfig { + // TODO: this might need to be per kusto cluster... + static final String KUSTO_URL = "kusto.url"; + static final String KUSTO_DB = "kusto.db"; + static final String KUSTO_TABLE = "kusto.table"; + static final String KUSTO_TABLES_MAPPING = "kusto.tables.topics_mapping"; + + static final String KUSTO_AUTH_USERNAME = "kusto.auth.username"; + static final String KUSTO_AUTH_PASSWORD = "kusto.auth.password"; + + static final String KUSTO_AUTH_APPID = "kusto.auth.appid"; + static final String KUSTO_AUTH_APPKEY = "kusto.auth.appkey"; + static final String KUSTO_AUTH_AUTHORITY = "kusto.auth.authority"; + + static final String KUSTO_SINK_TEMPDIR = "kusto.sink.tempdir"; + static final String KUSTO_SINK_FLUSH_SIZE = "kusto.sink.flush_size"; + static final String KUSTO_SINK_FLUSH_INTERVAL_MS = "kusto.sink.flush_interval_ms"; + + public KustoSinkConfig(ConfigDef config, Map parsedConfig) { + super(config, parsedConfig); + } + + public KustoSinkConfig(Map parsedConfig) { + this(getConfig(), parsedConfig); + } + + public static ConfigDef getConfig() { + return new ConfigDef() + .define(KUSTO_URL, Type.STRING, Importance.HIGH, "Kusto cluster url") + .define(KUSTO_DB, Type.STRING, Importance.HIGH, "Kusto target database name") + + .define(KUSTO_TABLE, Type.STRING, null, Importance.HIGH, "Kusto target table (if a per topic mapping is required, use `kusto.tables.topic_mapping` instead)") + .define(KUSTO_TABLES_MAPPING, Type.STRING, null, Importance.HIGH, "Kusto target tables mapping (per topic mapping, 'topic1:table1;topic2:table2;')") + .define(KUSTO_AUTH_USERNAME, Type.STRING, null, Importance.HIGH, "Kusto auth using username,password combo: username") + .define(KUSTO_AUTH_PASSWORD, Type.STRING, null, Importance.HIGH, "Kusto auth using username,password combo: password") + .define(KUSTO_AUTH_APPID, Type.STRING, null, Importance.HIGH, "Kusto auth using appid,appkey combo: app id") + .define(KUSTO_AUTH_APPKEY, Type.STRING, null, Importance.HIGH, "Kusto auth using appid,appkey combo: app key") + .define(KUSTO_AUTH_AUTHORITY, Type.STRING, null, Importance.HIGH, "Kusto auth using appid,appkey combo: authority") + .define(KUSTO_SINK_TEMPDIR, Type.STRING, System.getProperty("java.io.tempdir"), Importance.LOW, "Temp dir that will be used by kusto sink to buffer records. defaults to system temp dir") + .define(KUSTO_SINK_FLUSH_SIZE, Type.LONG, FileUtils.ONE_MB, Importance.HIGH, "Kusto sink max buffer size (per topic+partition combo)") + .define(KUSTO_SINK_FLUSH_INTERVAL_MS, Type.LONG, TimeUnit.MINUTES.toMillis(5), Importance.HIGH, "Kusto sink max staleness in milliseconds (per topic+partition combo)"); + } + + public String getKustoUrl() { + return this.getString(KUSTO_URL); + } + + public String getKustoDb() { + return this.getString(KUSTO_DB); + } + + public String getKustoTable() { + return this.getString(KUSTO_TABLE); + } + + public String getKustoTopicToTableMapping() { + return this.getString(KUSTO_TABLES_MAPPING); + } + + public String getKustoAuthUsername() { + return this.getString(KUSTO_AUTH_USERNAME); + } + + public String getKustoAuthPassword() { + return this.getString(KUSTO_AUTH_PASSWORD); + } + + public String getKustoAuthAppid() { + return this.getString(KUSTO_AUTH_APPID); + } + + public String getKustoAuthAppkey() { + return this.getString(KUSTO_AUTH_APPKEY); + } + + public String getKustoAuthAuthority() { + return this.getString(KUSTO_AUTH_AUTHORITY); + } + + public long getKustoFlushSize() { + return this.getLong(KUSTO_SINK_FLUSH_SIZE); + } + + public String getKustoSinkTempDir() { + return this.getString(KUSTO_SINK_TEMPDIR); + } + + public long getKustoFlushIntervalMS() { + return this.getInt(KUSTO_SINK_FLUSH_INTERVAL_MS); + } +} + diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConnector.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConnector.java new file mode 100644 index 0000000..b5e34d4 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConnector.java @@ -0,0 +1,65 @@ +package com.microsoft.azure.kusto.kafka.connect.sink; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkConnector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +class KustoSinkConnector extends SinkConnector { + + private static final Logger log = LoggerFactory.getLogger(KustoSinkConnector.class); + private Map configProps; + + KustoSinkConnector(KustoSinkConfig config) { + } + + @Override + public String version() { + return Version.getVersion(); + } + + @Override + public Class taskClass() { + return KustoSinkTask.class; + } + + + @Override + public void start(Map props) throws ConnectException { + try { + configProps = new HashMap<>(props); + } catch (ConfigException e) { + throw new ConnectException("Couldn't start KustoSinkConnector due to configuration error", e); + } + } + + @Override + public List> taskConfigs(int maxTasks) { + Map taskProps = new HashMap<>(configProps); + List> taskConfigs = new ArrayList<>(); + + for (int i = 0; i < maxTasks; i++) { + taskConfigs.add(taskProps); + } + + return taskConfigs; + } + + @Override + public void stop() throws ConnectException { + log.info("Shutting down KustoSinkConnector"); + } + + @Override + public ConfigDef config() { + return KustoSinkConfig.getConfig(); + } +} \ No newline at end of file diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java new file mode 100644 index 0000000..1fcb516 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java @@ -0,0 +1,207 @@ +package com.microsoft.azure.kusto.kafka.connect.sink; + +//import com.microsoft.azure.kusto.kafka.connect.source.JsonSerialization +// FIXME: need to consume this package via maven once setup properly +//import com.microsoft.azure.sdk.kusto.ingest.KustoIngestClient; + +import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoConnectionStringBuilder; +import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoIngestClient; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.NotFoundException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + + +public class KustoSinkTask extends SinkTask { + static final String TOPICS_WILDCARD = "*"; + private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class); + private final Set assignment; + Map topicsToTables; + KustoIngestClient kustoIngestClient; + String databaseName; + Map writers; + private Long maxFileSize; + private String tempDir; + + + public KustoSinkTask() { + assignment = new HashSet<>(); + writers = new HashMap<>(); + } + + public static KustoIngestClient createKustoIngestClient(KustoSinkConfig config) throws Exception { + if (config.getKustoAuthAppid() != null) { + if (config.getKustoAuthAppkey() == null) { + throw new ConfigException("Kusto authentication missing App Key."); + } + return new KustoIngestClient(KustoConnectionStringBuilder.createWithAadApplicationCredentials( + config.getKustoUrl(), + //todo: should replace with proper initialization + config.getKustoAuthAppid(), + config.getKustoAuthAppkey(), + config.getKustoAuthAuthority() + )); + } + + if (config.getKustoAuthUsername() != null) { + if (config.getKustoAuthPassword() == null) { + throw new ConfigException("Kusto authentication missing Password."); + } + + return new KustoIngestClient(KustoConnectionStringBuilder.createWithAadUserCredentials( + config.getKustoUrl(), + //todo: should replace with proper initialization + config.getKustoAuthUsername(), + config.getKustoAuthPassword() + )); + } + + throw new ConfigException("Kusto authentication method must be provided."); + } + + public static Map getTopicsToTables(KustoSinkConfig config) { + Map result = new HashMap<>(); + + if (config.getKustoTable() != null) { + result.put(TOPICS_WILDCARD, config.getKustoTable()); + return result; + } + + if (config.getKustoTopicToTableMapping() != null) { + String[] mappings = config.getKustoTopicToTableMapping().split(";"); + + for (String mapping : mappings) { + String[] kvp = mapping.split(":"); + if (kvp.length != 2) { + throw new ConfigException("Provided table mapping is malformed. please make sure table mapping is of 'topicName:tableName;' format."); + } + + result.put(kvp[0], kvp[1]); + } + + return result; + } + + throw new ConfigException("Kusto table mapping must be provided."); + } + + public String getTable(String topic) { + if (topicsToTables.containsKey(TOPICS_WILDCARD)) { + return topicsToTables.get(TOPICS_WILDCARD); + } + + return topicsToTables.get(topic); + } + + @Override + public String version() { + return Version.getVersion(); + } + + @Override + public void open(Collection partitions) throws ConnectException { + assignment.addAll(partitions); + + for (TopicPartition tp : assignment) { + String table = getTable(tp.topic()); + + if (table == null) { + throw new ConnectException(String.format("Kusto Sink has no table mapped for the topic: %s. please check your configuration.", tp.topic())); + } else { + TopicPartitionWriter writer = new TopicPartitionWriter(tp, kustoIngestClient, databaseName, table, tempDir, maxFileSize); + + writer.open(); + writers.put(tp, writer); + } + } + } + + @Override + public void close(Collection partitions) { + for (TopicPartition tp : assignment) { + try { + writers.get(tp).close(); + } catch (ConnectException e) { + log.error("Error closing writer for {}. Error: {}", tp, e.getMessage()); + } + } + + writers.clear(); + assignment.clear(); + } + + + @Override + public void start(Map props) throws ConnectException { + try { + KustoSinkConfig config = new KustoSinkConfig(props); + + databaseName = config.getKustoDb(); + + topicsToTables = getTopicsToTables(config); + // this should be read properly from settings + kustoIngestClient = createKustoIngestClient(config); + tempDir = config.getKustoSinkTempDir(); + maxFileSize = config.getKustoFlushSize(); + + open(context.assignment()); + + } catch (ConfigException ex) { + throw new ConnectException(String.format("Kusto Connector failed to start due to configuration error. %s", ex.getMessage()), ex); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void stop() throws ConnectException { + for (TopicPartitionWriter writer : writers.values()) { + writer.close(); + } + } + + @Override + public void put(Collection records) throws ConnectException { + for (SinkRecord record : records) { + TopicPartition tp = new TopicPartition(record.topic(), record.kafkaPartition()); + TopicPartitionWriter writer = writers.get(tp); + + if (writer == null) { + throw new NotFoundException(String.format("Received a record without a mapped writer for topic:partition(%s:%d), dropping record.", tp.topic(), tp.partition())); + } + + writer.writeRecord(record); + } + } + + // this is a neat trick, since our rolling files commit whenever they like, offsets may drift + // from what kafka expects. so basically this is to re-sync topic-partition offests with our sink. + @Override + public Map preCommit( + Map offsets + ) { + Map offsetsToCommit = new HashMap<>(); + for (TopicPartition tp : assignment) { + + Long offset = writers.get(tp).lastCommitedOffset; + + if (offset != null) { + log.trace("Forwarding to framework request to commit offset: {} for {}", offset, tp); + offsetsToCommit.put(tp, new OffsetAndMetadata(offset)); + } + } + return offsetsToCommit; + } + + @Override + public void flush(Map offsets) throws ConnectException { + // do nothing , rolling files can handle writing + } +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java new file mode 100644 index 0000000..bab8de7 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java @@ -0,0 +1,94 @@ +package com.microsoft.azure.kusto.kafka.connect.sink; + +import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoIngestClient; +import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoIngestionProperties; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; + +public class TopicPartitionWriter { + GZIPFileWriter gzipFileWriter; + + TopicPartition tp; + KustoIngestClient client; + + String databse; + String table; + + String basePath; + long fileThreshold; + + long currentOffset; + Long lastCommitedOffset; + + TopicPartitionWriter( + TopicPartition tp, KustoIngestClient client, + String database, String table, + String basePath, long fileThreshold + ) { + this.tp = tp; + this.client = client; + this.table = table; + this.fileThreshold = fileThreshold; + this.databse = database; + this.basePath = basePath; + this.currentOffset = 0; + } + + public void handleRollFile(GZIPFileDescriptor fileDescriptor) { + KustoIngestionProperties properties = new KustoIngestionProperties(databse, table, fileDescriptor.rawBytes); + + try { + client.ingestFromSingleFile(fileDescriptor.path, properties); + lastCommitedOffset = currentOffset; + } catch (Exception e) { + e.printStackTrace(); + } + } + + public String getFilePath() { + long nextOffset = gzipFileWriter != null && gzipFileWriter.isDirty() ? currentOffset + 1 : currentOffset; + return Paths.get(basePath, String.format("kafka_%s_%s_%d", tp.topic(), tp.partition(), nextOffset)).toString(); + } + + public void writeRecord(SinkRecord record) { + byte[] value = new byte[0]; + // todo: should probably handle more schemas + if (record.valueSchema() == null || record.valueSchema() == Schema.STRING_SCHEMA) { + value = record.value().toString().getBytes(StandardCharsets.UTF_8); + } else if (record.valueSchema() == Schema.BYTES_SCHEMA) { + value = (byte[]) record.value(); + } else { + try { + throw new Exception("Unexpected value type, can only handle strings"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + try { + currentOffset = record.kafkaOffset(); + + gzipFileWriter.write(value); + + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void open() { + gzipFileWriter = new GZIPFileWriter(basePath, fileThreshold, this::handleRollFile, this::getFilePath); + } + + public void close() { + try { + gzipFileWriter.rollback(); + } catch (IOException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/Version.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/Version.java new file mode 100644 index 0000000..3f8b6db --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/Version.java @@ -0,0 +1,29 @@ +package com.microsoft.azure.kusto.kafka.connect.sink; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.util.Properties; + +public class Version { + private static final Logger log = LoggerFactory.getLogger(Version.class); + private static final String VERSION_FILE = "/azure-kusto-kafka-sink-version.properties"; + private static String version = "unknown"; + + static { + try { + Properties props = new Properties(); + try (InputStream versionFileStream = Version.class.getResourceAsStream(VERSION_FILE)) { + props.load(versionFileStream); + version = props.getProperty("version", version).trim(); + } + } catch (Exception e) { + log.warn("Error while loading version:", e); + } + } + + public static String getVersion() { + return version; + } +} \ No newline at end of file diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/AadAuthenticationHelper.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/AadAuthenticationHelper.java new file mode 100644 index 0000000..d42aea7 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/AadAuthenticationHelper.java @@ -0,0 +1,88 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +import com.microsoft.aad.adal4j.*; + +import javax.naming.ServiceUnavailableException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class AadAuthenticationHelper { + + private final static String c_microsoftAadTenantId = "72f988bf-86f1-41af-91ab-2d7cd011db47"; + private final static String c_kustoClientId = "ad30ae9e-ac1b-4249-8817-d24f5d7ad3de"; + + private ClientCredential m_clientCredential; + private String m_userUsername; + private String m_userPassword; + private String m_clusterUrl; + private String m_aadAuthorityId; + private String m_aadAuthorityUri; + + public static String getMicrosoftAadAuthorityId() { return c_microsoftAadTenantId; } + + public AadAuthenticationHelper(KustoConnectionStringBuilder kcsb) throws Exception{ + m_clusterUrl = kcsb.getClusterUrl(); + + if (!Utils.isNullOrEmpty(kcsb.getApplicationClientId()) && !Utils.isNullOrEmpty(kcsb.getApplicationKey())) { + m_clientCredential = new ClientCredential(kcsb.getApplicationClientId(), kcsb.getApplicationKey()); + } else { + m_userUsername = kcsb.getUserUsername(); + m_userPassword = kcsb.getUserPassword(); + } + + // Set the AAD Authority URI + m_aadAuthorityId = (kcsb.getAuthorityId() == null ? c_microsoftAadTenantId : kcsb.getAuthorityId()); + m_aadAuthorityUri = "https://login.microsoftonline.com/" + m_aadAuthorityId + "/oauth2/authorize"; + } + + public String acquireAccessToken() throws Exception { + if (m_clientCredential != null){ + return acquireAadApplicationAccessToken().getAccessToken(); + } else { + return acquireAadUserAccessToken().getAccessToken(); + } + } + + private AuthenticationResult acquireAadUserAccessToken() throws Exception { + AuthenticationContext context = null; + AuthenticationResult result = null; + ExecutorService service = null; + try { + service = Executors.newFixedThreadPool(1); + context = new AuthenticationContext(m_aadAuthorityUri, true, service); + + Future future = context.acquireToken( + m_clusterUrl, c_kustoClientId, m_userUsername, m_userPassword, + null); + result = future.get(); + } finally { + service.shutdown(); + } + + if (result == null) { + throw new ServiceUnavailableException("acquireAadUserAccessToken got 'null' authentication result"); + } + return result; + } + + private AuthenticationResult acquireAadApplicationAccessToken() throws Exception { + AuthenticationContext context = null; + AuthenticationResult result = null; + ExecutorService service = null; + try { + service = Executors.newFixedThreadPool(1); + context = new AuthenticationContext(m_aadAuthorityUri, true, service); + Future future = context.acquireToken(m_clusterUrl, m_clientCredential, null); + result = future.get(); + } finally { + service.shutdown(); + } + + if (result == null) { + throw new ServiceUnavailableException("acquireAadApplicationAccessToken got 'null' authentication result"); + } + return result; + } + +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/AzureStorageHelper.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/AzureStorageHelper.java new file mode 100644 index 0000000..bb6f28e --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/AzureStorageHelper.java @@ -0,0 +1,144 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlobOutputStream; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueClient; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import com.microsoft.azure.storage.table.CloudTable; +import com.microsoft.azure.storage.table.TableOperation; +import com.microsoft.azure.storage.table.TableServiceEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.InvalidKeyException; +import java.util.zip.GZIPOutputStream; + +public class AzureStorageHelper { + + private static final Logger log = LoggerFactory.getLogger(AzureStorageHelper.class); + private static final int GZIP_BUFFER_SIZE = 16384; + + public static void postMessageToQueue(String queuePath, String content) throws Exception { + + try + { + CloudQueue queue = new CloudQueue(new URI(queuePath)); + CloudQueueMessage queueMessage = new CloudQueueMessage(content); + queue.addMessage(queueMessage); + } + catch (Exception e) + { + log.error(String.format("postMessageToQueue: %s.",e.getMessage()), e); + throw e; + } + } + + private static void postMessageStorageAccountNotWorkingYet(String content) throws URISyntaxException, InvalidKeyException, StorageException { + // Retrieve storage account from connection-string. + CloudStorageAccount storageAccount = CloudStorageAccount.parse(""); + + // Create the queue client. + CloudQueueClient queueClient = storageAccount.createCloudQueueClient(); + + // Retrieve a reference to a queue. + CloudQueue queue = queueClient.getQueueReference("myqueue"); + + // Create the queue if it doesn't already exist. + queue.createIfNotExists(); + + // Create a message and add it to the queue. + CloudQueueMessage message = new CloudQueueMessage(content); + queue.addMessage(message); + } + + public static void postMessageToQueue2(String queueName, String content, int invisibleTimeInSeconds) throws Exception { + + try { + + final String storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=6txkstrldaaoa01;AccountKey=****;EndpointSuffix=core.windows.net"; + final CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); + final CloudQueueClient queueClient = storageAccount.createCloudQueueClient(); + CloudQueue queue = queueClient.getQueueReference("readyforaggregation"); + + + + CloudQueueMessage cloudQueueMessage = new CloudQueueMessage("{\"Id\":\"bfe6ae6a-3582-42fd-b871-f2749cda62d7\",\"BlobPath\":\"https://6txkstrldaaoa01.blob.core.windows.net/8up-20171106-temp-e5c334ee145d4b43a3a2d3a96fbac1df/41ef7c03-851d-461e-91eb-34fc91e6a789?sig=gLnw2ZXjh8D3hitwfQuVRcOz7QCMm3S3msLLKRmyTvY%3D&st=2017-11-07T05%3A08%3A05Z&se=2017-11-09T05%3A08%3A05Z&sv=2017-04-17&si=DownloadPolicy&sp=rwd&sr=b\",\"RawDataSize\":1645,\"DatabaseName\":\"AdobeAnalytics\",\"TableName\":\"RonylTest\",\"RetainBlobOnSuccess\":false,\"Format\":\"tsv\",\"FlushImmediately\":false,\"ReportLevel\":0}"); + queue.addMessage(cloudQueueMessage); + } catch (Exception e) { + // Output the stack trace. + e.printStackTrace(); + } + } + + public static void azureTableInsertEntity(String tableUri, TableServiceEntity entity) throws StorageException, URISyntaxException { + CloudTable table = new CloudTable(new URI(tableUri)); + // Create an operation to add the new customer to the table basics table. + TableOperation insert = TableOperation.insert(entity); + // Submit the operation to the table service. + table.execute(insert); + } + + public static CloudBlockBlob uploadLocalFileToBlob(String filePath, String blobName, String storageUri) throws Exception{ + try { + log.debug(String.format("uploadLocalFileToBlob: filePath: %s, blobName: %s, storageUri: %s", filePath, blobName, storageUri)); + + // Check if the file is already compressed: + boolean isCompressed = filePath.endsWith(".gz") || filePath.endsWith(".zip"); + + CloudBlobContainer container = new CloudBlobContainer(new URI(storageUri)); + File sourceFile = new File(filePath); + + //Getting a blob reference + CloudBlockBlob blob; + + if(!isCompressed) + { + blob = container.getBlockBlobReference(blobName+".gz"); + InputStream fin = Files.newInputStream(Paths.get(filePath)); + BlobOutputStream bos = blob.openOutputStream(); + GZIPOutputStream gzout = new GZIPOutputStream(bos); + + byte[] buffer = new byte[GZIP_BUFFER_SIZE]; + int length; + while ((length = fin.read(buffer)) > 0) { + gzout.write(buffer, 0, length); + } + + gzout.close(); + fin.close(); + + } else { + blob = container.getBlockBlobReference(blobName); + blob.uploadFromFile(sourceFile.getAbsolutePath()); + } + + return blob; + } + catch (StorageException se) + { + log.error(String.format("uploadLocalFileToBlob: Error returned from the service. Http code: %d and error code: %s", se.getHttpStatusCode(), se.getErrorCode()), se); + throw se; + } + catch (Exception ex) + { + log.error(String.format("uploadLocalFileToBlob: Error while uploading file to blob."), ex); + throw ex; + } + } + + public static String getBlobPathWithSas(CloudBlockBlob blob) { + StorageCredentialsSharedAccessSignature signature = (StorageCredentialsSharedAccessSignature)blob.getServiceClient().getCredentials(); + return blob.getStorageUri().getPrimaryUri().toString() + "?" + signature.getToken(); + } +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/BlobDescription.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/BlobDescription.java new file mode 100644 index 0000000..8e9c251 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/BlobDescription.java @@ -0,0 +1,49 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +import java.util.UUID; + +public class BlobDescription { + private String m_blobPath; + private Long m_blobSize; + private UUID m_sourceId; + + public String getBlobPath() + { + return m_blobPath; + } + + public void setBlobPath(String blobPath) + { + m_blobPath = blobPath; + } + + public Long getBlobSize() + { + return m_blobSize; + } + + public void setBlobSize(Long blobSize) + { + m_blobSize = blobSize; + } + + public UUID getSourceId() + { + return m_sourceId; + } + + public void setSourceId(UUID sourceId) + { + m_sourceId = sourceId; + } + + public BlobDescription() + { + } + + public BlobDescription(String blobPath, Long blobSize) + { + m_blobPath = blobPath; + m_blobSize = blobSize; + } +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/CslCommandGenerator.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/CslCommandGenerator.java new file mode 100644 index 0000000..25b529b --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/CslCommandGenerator.java @@ -0,0 +1,19 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +public class CslCommandGenerator { + public static String generateDmEventHubSourceSettingsShowCommand() + { + String command = ".show EventHub ingestion sources settings"; + return command; + } + + public static String generateIngestionResourcesShowCommand() { + String command = ".show ingestion resources"; + return command; + } + + public static String generateKustoIdentityGetCommand() { + String command = ".get kusto identity token"; + return command; + } +} \ No newline at end of file diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IKustoIngestionResult.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IKustoIngestionResult.java new file mode 100644 index 0000000..eee5a92 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IKustoIngestionResult.java @@ -0,0 +1,14 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +import java.net.URISyntaxException; +import java.util.List; + +import com.microsoft.azure.storage.StorageException; + +public interface IKustoIngestionResult { + /// + /// Retrieves the detailed ingestion status of + /// all data ingestion operations into Kusto associated with this IKustoIngestionResult instance. + /// + List GetIngestionStatusCollection() throws StorageException, URISyntaxException; +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestFromMultipleBlobsCallable.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestFromMultipleBlobsCallable.java new file mode 100644 index 0000000..e028b1a --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestFromMultipleBlobsCallable.java @@ -0,0 +1,79 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoClientExceptions.KustoClientAggregateException; +import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoClientExceptions.KustoClientException; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.codehaus.jackson.map.ObjectMapper; + +import java.net.URI; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Callable; + +public class IngestFromMultipleBlobsCallable implements Callable { + + private final String m_ingestionQueueUri; + private List m_blobPaths; + private Boolean m_deleteSourceOnSuccess; + private KustoIngestionProperties m_ingestionProperties; + + + public IngestFromMultipleBlobsCallable(List blobPaths, Boolean deleteSourceOnSuccess, KustoIngestionProperties ingestionProperties, final String ingestionQueueUri) { + m_blobPaths = blobPaths; + m_deleteSourceOnSuccess = deleteSourceOnSuccess; + m_ingestionProperties = ingestionProperties; + m_ingestionQueueUri = ingestionQueueUri; + } + + @Override + public Object call() throws Exception { + if (m_blobPaths == null || m_blobPaths.size() == 0) { + throw new KustoClientException("blobs must have at least 1 path"); + } + + List ingestionErrors = new LinkedList(); + + for (String blobPath : m_blobPaths) { + try { + // Create the ingestion message + IngestionBlobInfo ingestionBlobInfo = new IngestionBlobInfo(blobPath, m_ingestionProperties.getDatabaseName(), m_ingestionProperties.getTableName()); + ingestionBlobInfo.rawDataSize = estimateBlobRawSize(blobPath); + ingestionBlobInfo.retainBlobOnSuccess = !m_deleteSourceOnSuccess; + ingestionBlobInfo.reportLevel = m_ingestionProperties.getReportLevel(); + ingestionBlobInfo.reportMethod = m_ingestionProperties.getReportMethod(); + ingestionBlobInfo.flushImmediately = m_ingestionProperties.getFlushImmediately(); + ingestionBlobInfo.additionalProperties = m_ingestionProperties.getAdditionalProperties(); + + ObjectMapper objectMapper = new ObjectMapper(); + String serializedIngestionBlobInfo = objectMapper.writeValueAsString(ingestionBlobInfo); + + AzureStorageHelper.postMessageToQueue(m_ingestionQueueUri, serializedIngestionBlobInfo); + } catch (Exception ex) { + ingestionErrors.add(new KustoClientException(blobPath, "fail to post message to queue", ex)); + } + } + + if (ingestionErrors.size() > 0) { + throw new KustoClientAggregateException(ingestionErrors); + } + return null; + } + + + private Long estimateBlobRawSize(String blobPath) throws Exception { + + try { + CloudBlockBlob blockBlob = new CloudBlockBlob(new URI(blobPath)); + blockBlob.downloadAttributes(); + long length = blockBlob.getProperties().getLength(); + + if (length == 0) { + return null; + } + return length; + + } catch (Exception e) { + return null; + } + } +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionBlobInfo.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionBlobInfo.java new file mode 100644 index 0000000..a123955 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionBlobInfo.java @@ -0,0 +1,30 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +import java.util.Map; +import java.util.UUID; + +final public class IngestionBlobInfo { + public String blobPath; + public Long rawDataSize; + public String databaseName; + public String tableName; + public UUID id; + public Boolean retainBlobOnSuccess; + public KustoIngestionProperties.IngestionReportLevel reportLevel; + public KustoIngestionProperties.IngestionReportMethod reportMethod; + public Boolean flushImmediately; + public IngestionStatusInTableDescription IngestionStatusInTable; + + public Map additionalProperties; + + public IngestionBlobInfo(String blobPath, String databaseName, String tableName) { + this.blobPath = blobPath; + this.databaseName = databaseName; + this.tableName = tableName; + id = UUID.randomUUID(); + retainBlobOnSuccess = false; + flushImmediately = false; + reportLevel = KustoIngestionProperties.IngestionReportLevel.FailuresOnly; + reportMethod = KustoIngestionProperties.IngestionReportMethod.Queue; + } +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionErrorCode.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionErrorCode.java new file mode 100644 index 0000000..b922aa6 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionErrorCode.java @@ -0,0 +1,218 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +public enum IngestionErrorCode { + /// + /// Unknown error occurred + /// + Unknown, + + /// + /// Low memory condition. + /// + Stream_LowMemoryCondition, + + /// + /// Wrong number of fields. + /// + Stream_WrongNumberOfFields, + + /// + /// Input stream/record/field too large. + /// + Stream_InputStreamTooLarge, + + /// + /// No data streams to ingest + /// + Stream_NoDataToIngest, + + /// + /// Invalid csv format - closing quote missing. + /// + Stream_ClosingQuoteMissing, + + /// + /// Failed to download source from Azure storage - source not found + /// + Download_SourceNotFound, + + /// + /// Failed to download source from Azure storage - access condition not satisfied + /// + Download_AccessConditionNotSatisfied, + + /// + /// Failed to download source from Azure storage - access forbidden + /// + Download_Forbidden, + + /// + /// Failed to download source from Azure storage - account not found + /// + Download_AccountNotFound, + + /// + /// Failed to download source from Azure storage - bad request + /// + Download_BadRequest, + + /// + /// Failed to download source from Azure storage - not transient error + /// + Download_NotTransient, + + /// + /// Failed to download source from Azure storage - unknown error + /// + Download_UnknownError, + + /// + /// Failed to invoke update policy. Query schema does not match table schema + /// + UpdatePolicy_QuerySchemaDoesNotMatchTableSchema, + + /// + /// Failed to invoke update policy. Failed descendant transactional update policy + /// + UpdatePolicy_FailedDescendantTransaction, + + /// + /// Failed to invoke update policy. Ingestion Error occurred + /// + UpdatePolicy_IngestionError, + + /// + /// Failed to invoke update policy. Unknown error occurred + /// + UpdatePolicy_UnknownError, + + /// + /// Json pattern was not ingested with jsonMapping parameter + /// + BadRequest_MissingJsonMappingtFailure, + + /// + /// Blob is invalid or empty zip archive + /// + BadRequest_InvalidOrEmptyBlob, + + /// + /// Database does not exist + /// + BadRequest_DatabaseNotExist, + + /// + /// Table does not exist + /// + BadRequest_TableNotExist, + + /// + /// Invalid kusto identity token + /// + BadRequest_InvalidKustoIdentityToken, + + /// + /// Blob path without SAS from unknown blob storage + /// + BadRequest_UriMissingSas, + + /// + /// File too large + /// + BadRequest_FileTooLarge, + + /// + /// No valid reply from ingest command + /// + BadRequest_NoValidResponseFromEngine, + + /// + /// Access to table is denied + /// + BadRequest_TableAccessDenied, + + /// + /// Message is exhausted + /// + BadRequest_MessageExhausted, + + /// + /// Bad request + /// + General_BadRequest, + + /// + /// Internal server error occurred + /// + General_InternalServerError, + + /// + /// Failed to invoke update policy. Cyclic update is not allowed + /// + UpdatePolicy_Cyclic_Update_Not_Allowed, + + /// + /// Failed to invoke update policy. Transactional update policy is not allowed in streaming ingestion + /// + UpdatePolicy_Transactional_Not_Allowed_In_Streaming_Ingestion, + + /// + /// Failed to parse csv mapping. + /// + BadRequest_InvalidCsvMapping, + + /// + /// Invalid mapping reference. + /// + BadRequest_InvalidMappingReference, + + /// + /// Mapping reference wasn't found. + /// + BadRequest_MappingReferenceWasNotFound, + + /// + /// Failed to parse json mapping. + /// + BadRequest_InvalidJsonMapping, + + /// + /// Format is not supported + /// + BadRequest_FormatNotSupported, + + /// + /// Ingestion properties contains ingestion mapping and ingestion mapping reference. + /// + BadRequest_DuplicateMapping, + + /// + /// Message is corrupted + /// + BadRequest_CorruptedMessage, + + /// + /// Inconsistent ingestion mapping + /// + BadRequest_InconsistentMapping, + + /// + /// Syntax error + /// + BadRequest_SyntaxError, + + /// + /// Abandoned ingestion. + /// + General_AbandonedIngestion, + + /// + /// Throttled ingestion. + /// + General_ThrottledIngestion, + + /// + /// Schema of target table at start time doesn't match the one at commit time. + /// + General_TransientSchemaMismatch, +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionFailureInfo.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionFailureInfo.java new file mode 100644 index 0000000..a6ffd11 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionFailureInfo.java @@ -0,0 +1,35 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client;//import org.joda.time.DateTime; + +import java.util.Date; +import java.util.UUID; + +public class IngestionFailureInfo { + public UUID OperationId; + public String Database; + public String Table; + public Date FailedOn; + public UUID IngestionSourceId; + public String IngestionSourcePath; + public String Details; + public FailureStatusValue FailureStatus; + public UUID RootActivityId; + public Boolean OriginatesFromUpdatePolicy; + + public enum FailureStatusValue { + Unknown(0), + Permanent(1), + Transient(2), + Exhausted(3); + + private final int value; + + FailureStatusValue(int v) { + value = v; + } + + public int getValue() { + return value; + } + } +} + diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionStatus.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionStatus.java new file mode 100644 index 0000000..ea8c51d --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionStatus.java @@ -0,0 +1,185 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +import java.time.Instant; +import java.util.Date; +import java.util.UUID; + +import com.microsoft.azure.storage.table.TableServiceEntity; + +/// +/// This class represents an ingestion status. +/// +/// +/// Any change to this class must be made in a backwards/forwards-compatible manner. +/// +public class IngestionStatus extends TableServiceEntity { + /// + /// The updated status of the ingestion. The ingestion status will be 'Pending' + /// during the ingestion's process + /// and will be updated as soon as the ingestion completes. + /// + public OperationStatus status; + + public String getStatus() { + return status.toString(); + } + + public void setStatus(String s) { + status = OperationStatus.valueOf(s); + } + + /// + /// A unique identifier representing the ingested source. Can be supplied during + /// the ingestion execution. + /// + public UUID ingestionSourceId; + + public UUID getIngestionSourceId() { + return ingestionSourceId; + } + + public void setIngestionSourceId(UUID id) { + ingestionSourceId = id; + } + + /// + /// The URI of the blob, potentially including the secret needed to access + /// the blob. This can be a file system URI (on-premises deployments only), + /// or an Azure Blob Storage URI (including a SAS key or a semicolon followed + /// by the account key) + /// + public String ingestionSourcePath; + + public String getIngestionSourcePath() { + return ingestionSourcePath; + } + + public void setIngestionSourcePath(String path) { + ingestionSourcePath = path; + } + + /// + /// The name of the database holding the target table. + /// + public String database; + + public String getDatabase() { + return database; + } + + public void setDatabase(String db) { + database = db; + } + + /// + /// The name of the target table into which the data will be ingested. + /// + public String table; + + public String getTable() { + return table; + } + + public void setTable(String t) { + table = t; + } + + /// + /// The last updated time of the ingestion status. + /// + public Date updatedOn; + + public Date getUpdatedOn() { + return updatedOn; + } + + public void setUpdatedOn(Date lastUpdated) { + updatedOn = lastUpdated; + } + + /// + /// The ingestion's operation Id. + /// + public UUID operationId; + + public UUID getOperationId() { + return operationId; + } + + public void setOperationId(UUID id) { + operationId = id; + } + + /// + /// The ingestion's activity Id. + /// + public UUID activityId; + + public UUID getActivityId() { + return activityId; + } + + public void setActivityId(UUID id) { + activityId = id; + } + + /// + /// In case of a failure - indicates the failure's error code. + /// + public IngestionErrorCode errorCode; + + public String getErrorCode() { + return (errorCode != null ? errorCode : IngestionErrorCode.Unknown).toString(); + } + + public void setErrorCode(String code) { + errorCode = code == null ? IngestionErrorCode.Unknown : IngestionErrorCode.valueOf(code); + } + + /// + /// In case of a failure - indicates the failure's status. + /// + public IngestionFailureInfo.FailureStatusValue failureStatus; + + public String getFailureStatus() { + return (failureStatus != null ? failureStatus : IngestionFailureInfo.FailureStatusValue.Unknown).toString(); + } + + public void setFailureStatus(String status) { + failureStatus = IngestionFailureInfo.FailureStatusValue.valueOf(status); + } + + /// + /// In case of a failure - indicates the failure's details. + /// + public String details; + + public String getDetails() { + return details; + } + + public void setDetails(String d) { + details = d; + } + + /// + /// In case of a failure - indicates whether or not the failures originate from + /// an Update Policy. + /// + public boolean originatesFromUpdatePolicy; + + public boolean getOriginatesFromUpdatePolicy() { + return originatesFromUpdatePolicy; + } + + public void setOriginatesFromUpdatePolicy(boolean fromUpdatePolicy) { + originatesFromUpdatePolicy = fromUpdatePolicy; + } + + public IngestionStatus() { + } + + public IngestionStatus(UUID uuid) { + super(uuid.toString(), uuid.toString()); + } +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionStatusInTableDescription.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionStatusInTableDescription.java new file mode 100644 index 0000000..a07f153 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/IngestionStatusInTableDescription.java @@ -0,0 +1,7 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +public class IngestionStatusInTableDescription { + public String TableConnectionString; + public String PartitionKey; + public String RowKey; +} \ No newline at end of file diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoClient.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoClient.java new file mode 100644 index 0000000..85e7bf8 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoClient.java @@ -0,0 +1,42 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +import org.json.JSONObject; + +public class KustoClient { + + private final String c_adminCommandsPrefix = "."; + private final String c_apiVersion = "v1"; + private final String c_defaultDatabaseName = "NetDefaultDb"; + + private AadAuthenticationHelper m_aadAuthenticationHelper; + private String m_clusterUrl; + + public KustoClient(KustoConnectionStringBuilder kcsb) throws Exception { + m_clusterUrl = kcsb.getClusterUrl(); + m_aadAuthenticationHelper = new AadAuthenticationHelper(kcsb); + } + + public KustoResults execute(String command) throws Exception { + return execute(c_defaultDatabaseName, command); + } + + public KustoResults execute(String database, String command) throws Exception { + String clusterEndpoint; + if (command.startsWith(c_adminCommandsPrefix)) { + clusterEndpoint = String.format("%s/%s/rest/mgmt", m_clusterUrl, c_apiVersion); + } else { + clusterEndpoint = String.format("%s/%s/rest/query", m_clusterUrl, c_apiVersion); + } + return execute(database, command, clusterEndpoint); + } + + private KustoResults execute(String database, String command, String clusterEndpoint) throws Exception { + String aadAccessToken = m_aadAuthenticationHelper.acquireAccessToken(); + + String jsonString = new JSONObject() + .put("db", database) + .put("csl", command).toString(); + + return Utils.post(clusterEndpoint, aadAccessToken, jsonString); + } +} \ No newline at end of file diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoClientExceptions/KustoClientAggregateException.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoClientExceptions/KustoClientAggregateException.java new file mode 100644 index 0000000..457ba01 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoClientExceptions/KustoClientAggregateException.java @@ -0,0 +1,15 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client.KustoClientExceptions; + +import java.util.List; + +public class KustoClientAggregateException extends Exception{ + + List m_kustoClientExceptions; + + public List getExceptions() { return m_kustoClientExceptions; } + + public KustoClientAggregateException(List kustoClientExceptions) + { + m_kustoClientExceptions = kustoClientExceptions; + } +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoClientExceptions/KustoClientException.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoClientExceptions/KustoClientException.java new file mode 100644 index 0000000..bdec630 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoClientExceptions/KustoClientException.java @@ -0,0 +1,20 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client.KustoClientExceptions; + +public class KustoClientException extends Exception { + private String m_ingestionSource; + + public String getIngestionSource() { return m_ingestionSource; } + + public KustoClientException(String message) { + super(message); + } + + public KustoClientException(String message, Exception exception) { + super(message, exception); + } + + public KustoClientException(String ingestionSource, String message, Exception exception) { + super(message, exception); + m_ingestionSource = ingestionSource; + } +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoClientExceptions/KustoWebException.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoClientExceptions/KustoWebException.java new file mode 100644 index 0000000..979ee14 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoClientExceptions/KustoWebException.java @@ -0,0 +1,18 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client.KustoClientExceptions; + +import org.apache.http.HttpResponse; + +public class KustoWebException extends Exception{ + + private String m_message; + private HttpResponse m_httpResponse; + + public String getMessage() { return m_message; } + + public HttpResponse getHttpResponse() { return m_httpResponse; } + + public KustoWebException(String message, HttpResponse httpResponse) { + m_message = message; + m_httpResponse = httpResponse; + } +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoConnectionStringBuilder.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoConnectionStringBuilder.java new file mode 100644 index 0000000..cea14f0 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoConnectionStringBuilder.java @@ -0,0 +1,66 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +public class KustoConnectionStringBuilder { + + private String m_clusterUri; + private String m_username; + private String m_password; + private String m_applicationClientId; + private String m_applicationKey; + private String m_aadAuthorityId; // AAD tenant Id (GUID) + + public String getClusterUrl() { return m_clusterUri; } + public String getUserUsername() { return m_username; } + public String getUserPassword() { return m_password; } + public String getApplicationClientId() { return m_applicationClientId; } + public String getApplicationKey() { return m_applicationKey; } + public String getAuthorityId() { return m_aadAuthorityId; } + + private KustoConnectionStringBuilder(String resourceUri) + { + m_clusterUri = resourceUri; + m_username = null; + m_password = null; + m_applicationClientId = null; + m_applicationKey = null; + m_aadAuthorityId = null; + } + + public static KustoConnectionStringBuilder createWithAadUserCredentials(String resourceUri, + String username, + String password, + String authorityId) + { + KustoConnectionStringBuilder kcsb = new KustoConnectionStringBuilder(resourceUri); + kcsb.m_username = username; + kcsb.m_password = password; + kcsb.m_aadAuthorityId = authorityId; + return kcsb; + } + + public static KustoConnectionStringBuilder createWithAadUserCredentials(String resourceUri, + String username, + String password) + { + return createWithAadUserCredentials(resourceUri, username, password, null); + } + + public static KustoConnectionStringBuilder createWithAadApplicationCredentials(String resourceUri, + String applicationClientId, + String applicationKey, + String authorityId) + { + KustoConnectionStringBuilder kcsb = new KustoConnectionStringBuilder(resourceUri); + kcsb.m_applicationClientId = applicationClientId; + kcsb.m_applicationKey = applicationKey; + kcsb.m_aadAuthorityId = authorityId; + return kcsb; + } + + public static KustoConnectionStringBuilder createWithAadApplicationCredentials(String resourceUri, + String applicationClientId, + String applicationKey) + { + return createWithAadApplicationCredentials(resourceUri, applicationClientId, applicationKey, null); + } +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoIngestClient.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoIngestClient.java new file mode 100644 index 0000000..9f70c96 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoIngestClient.java @@ -0,0 +1,222 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoClientExceptions.KustoClientAggregateException; +import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoClientExceptions.KustoClientException; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.URI; +import java.sql.Date; +import java.time.Instant; +import java.util.*; +import java.util.stream.Collectors; + +public class KustoIngestClient { + public static final int COMPRESSED_FILE_MULTIPLIER = 11; + public final Logger log = LoggerFactory.getLogger(KustoIngestClient.class); + public KustoClient m_kustoClient; + public ResourceManager m_resourceManager; + + public KustoIngestClient(KustoConnectionStringBuilder kcsb) throws Exception { + log.info("Creating a new KustoIngestClient"); + m_kustoClient = new KustoClient(kcsb); + m_resourceManager = new ResourceManager(m_kustoClient); + } + + /* + * public Future ingestFromMultipleBlobsPaths(List blobPaths, Boolean + * deleteSourceOnSuccess, KustoIngestionProperties ingestionProperties){ + * + * ExecutorService executorService = Executors.newSingleThreadExecutor(); + * + * return executorService.submit(new IngestFromMultipleBlobsCallable(blobPaths, + * deleteSourceOnSuccess, ingestionProperties, c_ingestionQueueUri)); } + */ + + public IKustoIngestionResult ingestFromMultipleBlobsPaths(List blobPaths, Boolean deleteSourceOnSuccess, + KustoIngestionProperties ingestionProperties) throws Exception { + + // ingestFromMultipleBlobsAsync(blobPaths, deleteSourceOnSuccess, + // ingestionProperties).get(); + List blobDescriptions = blobPaths.stream().map(b -> new BlobDescription(b, null)) + .collect(Collectors.toList()); + return ingestFromMultipleBlobsImpl(blobDescriptions, deleteSourceOnSuccess, ingestionProperties); + } + + /* + * public Future ingestFromSingleBlob(String blobPath, Boolean deleteSourceOnSuccess, + KustoIngestionProperties ingestionProperties, Long rawDataSize){ + * + * ExecutorService executorService = Executors.newSingleThreadExecutor(); + * + * return executorService.submit(new IngestFromMultipleBlobsCallable(blobPaths, + * deleteSourceOnSuccess, ingestionProperties, c_ingestionQueueUri)); } + */ + + public IKustoIngestionResult ingestFromSingleBlob(String blobPath, Boolean deleteSourceOnSuccess, + KustoIngestionProperties ingestionProperties, Long rawDataSize) throws Exception { + + BlobDescription blobDescription = new BlobDescription(blobPath, rawDataSize); + return ingestFromMultipleBlobsImpl(new ArrayList(Arrays.asList(blobDescription)), deleteSourceOnSuccess, ingestionProperties); + } + + public IKustoIngestionResult ingestFromMultipleBlobs(List blobDescriptions, + Boolean deleteSourceOnSuccess, KustoIngestionProperties ingestionProperties) throws Exception { + return ingestFromMultipleBlobsImpl(blobDescriptions, deleteSourceOnSuccess, ingestionProperties); + } + + public IKustoIngestionResult ingestFromMultipleBlobsImpl(List blobDescriptions, + Boolean deleteSourceOnSuccess, KustoIngestionProperties ingestionProperties) throws Exception { + + // ingestFromMultipleBlobsAsync(blobPaths, deleteSourceOnSuccess, + // ingestionProperties).get(); + if (blobDescriptions == null || blobDescriptions.size() == 0) { + throw new KustoClientException("blobs must have at least 1 path"); + } + + ingestionProperties.setAuthorizationContextToken(m_resourceManager.getKustoIdentityToken()); + + List ingestionErrors = new LinkedList(); + List tableStatuses = new LinkedList<>(); + + for (BlobDescription blobDescription : blobDescriptions) { + try { + // Create the ingestion message + IngestionBlobInfo ingestionBlobInfo = new IngestionBlobInfo(blobDescription.getBlobPath(), + ingestionProperties.getDatabaseName(), ingestionProperties.getTableName()); + ingestionBlobInfo.rawDataSize = blobDescription.getBlobSize() != null ? blobDescription.getBlobSize() + : estimateBlobRawSize(blobDescription); + ingestionBlobInfo.retainBlobOnSuccess = !deleteSourceOnSuccess; + ingestionBlobInfo.reportLevel = ingestionProperties.getReportLevel(); + ingestionBlobInfo.reportMethod = ingestionProperties.getReportMethod(); + ingestionBlobInfo.flushImmediately = ingestionProperties.getFlushImmediately(); + ingestionBlobInfo.additionalProperties = ingestionProperties.getAdditionalProperties(); + if (blobDescription.getSourceId() != null) { + ingestionBlobInfo.id = blobDescription.getSourceId(); + } + + if (ingestionProperties.getReportMethod() != KustoIngestionProperties.IngestionReportMethod.Queue) { + String tableStatusUri = GetTableStatus(); + ingestionBlobInfo.IngestionStatusInTable = new IngestionStatusInTableDescription(); + ingestionBlobInfo.IngestionStatusInTable.TableConnectionString = tableStatusUri; + ingestionBlobInfo.IngestionStatusInTable.RowKey = ingestionBlobInfo.id.toString(); + ingestionBlobInfo.IngestionStatusInTable.PartitionKey = ingestionBlobInfo.id.toString(); + + IngestionStatus status = new IngestionStatus(ingestionBlobInfo.id); + status.database = ingestionProperties.getDatabaseName(); + status.table = ingestionProperties.getTableName(); + status.status = OperationStatus.Pending; + status.updatedOn = Date.from(Instant.now()); + status.ingestionSourceId = ingestionBlobInfo.id; + status.setIngestionSourcePath(blobDescription.getBlobPath()); + + AzureStorageHelper.azureTableInsertEntity(tableStatusUri, status); + tableStatuses.add(ingestionBlobInfo.IngestionStatusInTable); + } + + ObjectMapper objectMapper = new ObjectMapper(); + String serializedIngestionBlobInfo = objectMapper.writeValueAsString(ingestionBlobInfo); + + AzureStorageHelper.postMessageToQueue(m_resourceManager.getAggregationQueue(), serializedIngestionBlobInfo); + } catch (Exception ex) { + ingestionErrors.add( + new KustoClientException(blobDescription.getBlobPath(), "fail to post message to queue", ex)); + } + } + + if (ingestionErrors.size() > 0) { + throw new KustoClientAggregateException(ingestionErrors); + } + + return new TableReportKustoIngestionResult(tableStatuses); + } + + public void ingestFromSingleFile(String filePath, KustoIngestionProperties ingestionProperties) throws Exception { + try { + String blobName = genBlobName(filePath, ingestionProperties.getDatabaseName(), ingestionProperties.getTableName()); + CloudBlockBlob blob = AzureStorageHelper.uploadLocalFileToBlob(filePath, blobName, m_resourceManager.getStorageUri()); + String blobPath = AzureStorageHelper.getBlobPathWithSas(blob); + long rawDataSize = ingestionProperties.getFileSize() != null ? ingestionProperties.getFileSize() : estimateLocalFileSize(filePath); + + ingestFromSingleBlob(blobPath, true, ingestionProperties, rawDataSize); + + } catch (Exception ex) { + log.error(String.format("ingestFromSingleFile: Error while uploading file (compression mode): %s. Error: %s", filePath, ex.getMessage()), ex); + throw ex; + } + } + + public List GetAndDiscardTopIngestionFailures() throws Exception { + // Get ingestion queues from DM + KustoResults failedIngestionsQueues = m_kustoClient + .execute(CslCommandGenerator.generateIngestionResourcesShowCommand()); + String failedIngestionsQueue = failedIngestionsQueues.getValues().get(0) + .get(failedIngestionsQueues.getIndexByColumnName("Uri")); + + CloudQueue queue = new CloudQueue(new URI(failedIngestionsQueue)); + + Iterable messages = queue.retrieveMessages(32, 5000, null, null); + + return null; + } + + + public String GetTableStatus() throws Exception { + KustoResults result = m_kustoClient.execute(CslCommandGenerator.generateIngestionResourcesShowCommand()); + return GetRandomResourceByResourceTypeName(result, "IngestionsStatusTable"); + } + + public String GetRandomResourceByResourceTypeName(KustoResults kustoResults, String name) { + ArrayList results = new ArrayList(); + for (Iterator> it = kustoResults.getValues().iterator(); it.hasNext(); ) { + ArrayList next = it.next(); + if (next.get(0).equals(name)) { + results.add(next.get(1)); + } + } + + Random randomizer = new Random(); + return results.get(randomizer.nextInt(results.size())); + } + + public Long estimateBlobRawSize(BlobDescription blobDescription) throws Exception { + + try { + String blobPath = blobDescription.getBlobPath(); + CloudBlockBlob blockBlob = new CloudBlockBlob(new URI(blobPath)); + blockBlob.downloadAttributes(); + long length = blockBlob.getProperties().getLength(); + + if (length == 0) { + return null; + } + if (blobPath.contains(".zip") || blobPath.contains(".gz")) { + length = length * COMPRESSED_FILE_MULTIPLIER; + } + + return length; + + } catch (Exception e) { + throw e; + } + } + + public long estimateLocalFileSize(String filePath) { + File file = new File(filePath); + long fileSize = file.length(); + if (filePath.endsWith(".zip") || filePath.endsWith(".gz")) { + fileSize = fileSize * COMPRESSED_FILE_MULTIPLIER; + } + return fileSize; + } + + public String genBlobName(String filePath, String databaseName, String tableName) { + String fileName = (new File(filePath)).getName(); + return String.format("%s__%s__%s__%s", databaseName, tableName, UUID.randomUUID().toString(), fileName); + } +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoIngestionProperties.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoIngestionProperties.java new file mode 100644 index 0000000..fa19734 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoIngestionProperties.java @@ -0,0 +1,113 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +import java.util.HashMap; +import java.util.Map; + +public class KustoIngestionProperties implements Comparable { + private String m_databaseName; + private String m_tableName; + private Long m_fileSize; + private boolean m_flushImmediately; + private IngestionReportLevel m_reportLevel; + private IngestionReportMethod m_reportMethod; + private Map m_additionalProperties; + + public KustoIngestionProperties(String databaseName, String tableName, Long fileSize) { + m_databaseName = databaseName; + m_tableName = tableName; + m_reportLevel = IngestionReportLevel.FailuresOnly; + m_reportMethod = IngestionReportMethod.Queue; + m_flushImmediately = false; + m_fileSize = fileSize; + m_additionalProperties = new HashMap(); + } + + public String getDatabaseName() { + return m_databaseName; + } + + public Long getFileSize() { + return m_fileSize; + } + + public String getTableName() { + return m_tableName; + } + + public boolean getFlushImmediately() { + return m_flushImmediately; + } + + public void setFlushImmediately(boolean flushImmediately) { + m_flushImmediately = flushImmediately; + } + + public IngestionReportLevel getReportLevel() { + return m_reportLevel; + } + + public void setReportLevel(IngestionReportLevel reportLevel) { + m_reportLevel = reportLevel; + } + + public IngestionReportMethod getReportMethod() { + return m_reportMethod; + } + + public void setReportMethod(IngestionReportMethod reportMethod) { + m_reportMethod = reportMethod; + } + + public Map getAdditionalProperties() { + return m_additionalProperties; + } + + public void setDataFormat(DATA_FORMAT dataFormat) { + m_additionalProperties.put("format", dataFormat.name()); + } + + /** + * Sets the data format by its name. If the name does not exist, then it does not add it. + * + * @param dataFormatName + */ + public void setDataFormat(String dataFormatName) { + String dataFormat = DATA_FORMAT.valueOf(dataFormatName.toLowerCase()).name(); + if (!dataFormat.isEmpty()) { + m_additionalProperties.put("format", dataFormat); + } + } + + public void setJsonMappingName(String jsonMappingName) { + m_additionalProperties.put("jsonMappingReference", jsonMappingName); + m_additionalProperties.put("format", DATA_FORMAT.json.name()); + } + + public void setCsvMappingName(String mappingName) { + m_additionalProperties.put("csvMappingReference", mappingName); + m_additionalProperties.put("format", DATA_FORMAT.csv.name()); + } + + public void setAuthorizationContextToken(String token) { + m_additionalProperties.put("authorizationContext", token); + } + + @Override + public int compareTo(KustoIngestionProperties o) { + return this.m_tableName == o.m_tableName && this.m_databaseName == o.m_databaseName ? 1 : -1; + } + + public enum DATA_FORMAT {csv, tsv, scsv, sohsv, psv, txt, json, singlejson, avro, parquet} + + public enum IngestionReportLevel { + FailuresOnly, + None, + FailuresAndSuccesses + } + + public enum IngestionReportMethod { + Queue, + Table, + QueueAndTable + } +} \ No newline at end of file diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoResults.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoResults.java new file mode 100644 index 0000000..ba79ea6 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/KustoResults.java @@ -0,0 +1,39 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +import java.util.ArrayList; +import java.util.HashMap; + +public class KustoResults { + private HashMap m_columnNameToIndex; + private HashMap m_columnNameToType; + private ArrayList> m_values; + + public KustoResults(HashMap columnNameToIndex, HashMap columnNameToType, + ArrayList> values) { + m_columnNameToIndex = columnNameToIndex; + m_columnNameToType = columnNameToType; + m_values = values; + } + + public HashMap getColumnNameToIndex() { + return m_columnNameToIndex; + } + + public HashMap getColumnNameToType() { + return m_columnNameToType; + } + + public Integer getIndexByColumnName(String columnName) { + return m_columnNameToIndex.get(columnName); + } + + public String getTypeByColumnName(String columnName) { + return m_columnNameToType.get(columnName); + } + + ; + + public ArrayList> getValues() { + return m_values; + } +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/OperationStatus.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/OperationStatus.java new file mode 100644 index 0000000..29268cd --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/OperationStatus.java @@ -0,0 +1,39 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +/// +/// An enum representing the state of a data ingestion operation into Kusto +/// +public enum OperationStatus { + /// + /// Represents a temporary status. + /// Might change during the course of ingestion based on the + /// outcome of the data ingestion operation into Kusto. + /// + Pending, + /// + /// Represents a permanent status. + /// The data has been successfully ingested to Kusto. + /// + Succeeded, + /// + /// Represents a permanent status. + /// The data has not been ingested to Kusto. + /// + Failed, + /// + /// Represents a permanent status. + /// The data has been queued for ingestion. + /// (This does not indicate the ingestion was successful) + /// + Queued, + /// + /// Represents a permanent status. + /// No data was supplied for ingestion. The ingest operation was skipped. + /// + Skipped, + /// + /// Represents a permanent status. + /// Part of the data has been successfully ingested to Kusto while some failed. + /// + PartiallySucceeded +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/ResourceManager.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/ResourceManager.java new file mode 100644 index 0000000..a0e07b9 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/ResourceManager.java @@ -0,0 +1,178 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class ResourceManager { + + // Constants: + private static final String SECURED_READY_FOR_AGGREGATION_QUEUE = "SecuredReadyForAggregationQueue"; + private static final String FAILED_INGESTIONS_QUEUE = "FailedIngestionsQueue"; + private static final String SUCCESSFUL_INGESTIONS_QUEUE = "SuccessfulIngestionsQueue"; + private static final String TEMP_STORAGE = "TempStorage"; + private static final String INGESTIONS_STATUS_TABLE = "IngestionsStatusTable"; + + // Ingestion Resources value lists: + private ArrayList aggregationQueueList = new ArrayList<>(); + private ArrayList failedIngestionsQueueList = new ArrayList<>(); + private ArrayList successfulIngestionsQueueList = new ArrayList<>(); + private ArrayList tempStorageList = new ArrayList<>(); + private ArrayList ingestionsStatusTableList = new ArrayList<>(); + + //Identity Token + private String m_identityToken; + + // Round-rubin indexes: + private int aggregationQueueIdx = 0; + private int tempStorageIdx = 0; + + private KustoClient m_kustoClient; + private final long refreshIngestionResourcesPeriod = 1000 * 60 * 60 * 1; // 1 hour + private Timer timer = new Timer(true); + private final Logger log = LoggerFactory.getLogger(KustoIngestClient.class); + + public ResourceManager(KustoClient kustoClient) { + m_kustoClient = kustoClient; + + TimerTask refreshIngestionResourceValuesTask = new TimerTask() { + @Override + public void run() { + try { + refreshIngestionResources(); + } catch (Exception e) { + log.error(String.format("Error in refreshIngestionResources: %s.", e.getMessage()), e); + } + } + }; + + TimerTask refreshIngestionAuthTokenTask = new TimerTask() { + @Override + public void run() { + try { + refreshIngestionAuthToken(); + } catch (Exception e) { + log.error(String.format("Error in refreshIngestionAuthToken: %s.", e.getMessage()), e); + } + } + }; + + timer.schedule(refreshIngestionAuthTokenTask, 0, refreshIngestionResourcesPeriod); + timer.schedule(refreshIngestionResourceValuesTask, 0, refreshIngestionResourcesPeriod); + + } + + public void clean() { + aggregationQueueList = new ArrayList<>(); + failedIngestionsQueueList = new ArrayList<>(); + successfulIngestionsQueueList = new ArrayList<>(); + tempStorageList = new ArrayList<>(); + ingestionsStatusTableList = new ArrayList<>(); + } + + public String getKustoIdentityToken() throws Exception { + if (m_identityToken == null) { + refreshIngestionAuthToken(); + if (m_identityToken == null) { + throw new Exception("Unable to get Identity token"); + } + } + return m_identityToken; + } + + public String getStorageUri() throws Exception { + int arrSize = tempStorageList.size(); + if (arrSize == 0) { + refreshIngestionResources(); + arrSize = tempStorageList.size(); + if (arrSize == 0) { + throw new Exception("Unable to get temp storages list"); + } + } + + // Round-rubin over the values of tempStorageList: + tempStorageIdx = (tempStorageIdx + 1) % arrSize; + return tempStorageList.get(tempStorageIdx); + } + + public String getAggregationQueue() throws Exception { + int arrSize = aggregationQueueList.size(); + if (arrSize == 0) { + refreshIngestionResources(); + arrSize = aggregationQueueList.size(); + if (arrSize == 0) { + throw new Exception("Unable to get aggregation queues list"); + } + } + + // Round-rubin over the values of aggregationQueueList: + aggregationQueueIdx = (aggregationQueueIdx + 1) % arrSize; + return aggregationQueueList.get(aggregationQueueIdx); + } + + private void addValue(String key, String value) { + switch (key) { + case SECURED_READY_FOR_AGGREGATION_QUEUE: + aggregationQueueList.add(value); + break; + case FAILED_INGESTIONS_QUEUE: + failedIngestionsQueueList.add(value); + break; + case SUCCESSFUL_INGESTIONS_QUEUE: + successfulIngestionsQueueList.add(value); + break; + case TEMP_STORAGE: + tempStorageList.add(value); + break; + case INGESTIONS_STATUS_TABLE: + ingestionsStatusTableList.add(value); + break; + default: + log.warn("Unrecognized key: %s", key); + break; + } + } + + + private void refreshIngestionResources() throws Exception { + log.info("Refreshing Ingestion Resources"); + KustoResults ingestionResourcesResults = m_kustoClient.execute(CslCommandGenerator.generateIngestionResourcesShowCommand()); + ArrayList> values = ingestionResourcesResults.getValues(); + + clean(); + + values.forEach(pairValues -> { + String key = pairValues.get(0); + String value = pairValues.get(1); + addValue(key, value); + }); + } + + private void refreshIngestionAuthToken() throws Exception { + log.info("Refreshing Ingestion Auth Token"); + KustoResults authToken = m_kustoClient.execute(CslCommandGenerator.generateKustoIdentityGetCommand()); + m_identityToken = authToken.getValues().get(0).get(authToken.getIndexByColumnName("AuthorizationContext")); + } + + + public ArrayList getAggregationQueueList() { + return aggregationQueueList; + } + + public ArrayList getFailedIngestionsQueueList() { + return failedIngestionsQueueList; + } + + public ArrayList getSuccessfulIngestionsQueueList() { + return successfulIngestionsQueueList; + } + + public ArrayList getTempStorageList() { + return tempStorageList; + } + + public ArrayList getIngestionsStatusTableList() { + return ingestionsStatusTableList; + } +} \ No newline at end of file diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/Sample.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/Sample.java new file mode 100644 index 0000000..89053a4 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/Sample.java @@ -0,0 +1,128 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; +import com.microsoft.azure.storage.blob.*; + +import java.io.IOException; +import java.net.URI; +import java.util.*; + +public class Sample { + + public static void main(String[] args) { + try { + + // This will work with MS AAD tenant. If another tenant is required, set this variable to the right one + String authorityId = AadAuthenticationHelper.getMicrosoftAadAuthorityId(); + //AzureStorageHelper.postMessageToQueue2("aaa", "aaa", 1); + + //String engineClusterUrl = "https://.kusto.windows.net"; + String dmClusterUrl = "https://ingest-kustoppas2.kusto.windows.net"; + + String databaseName = "velostrata"; + String tableName = "tmpLogs"; + + //String userUsername = ""; + //String userPassword = ""; + + String applicationClientId = ""; + String applicationKey = ""; + + //String dmClusterUrl = "https://ingest-roil.kusto.windows.net"; + //String userUsername = "roil@microsoft.com"; + //String userPassword = ""; + + // Kusto connection string that uses the given user and password during authentication + //KustoConnectionStringBuilder kcsb = KustoConnectionStringBuilder.createWithAadUserCredentials(dmClusterUrl, userUsername, userPassword, authorityId); + + // Kusto connection string that uses the given application client ID and key during authentication + KustoConnectionStringBuilder kcsb = + KustoConnectionStringBuilder.createWithAadApplicationCredentials(dmClusterUrl, applicationClientId, applicationKey, authorityId); + + // Create an instance of KustoClient that will be used for executing commands against the cluster + KustoIngestClient kustoClient = new KustoIngestClient(kcsb); + + + // Running a basic count query on the given database and table + //KustoResults results = kustoClient.execute(databaseName, String.format("%s | count", tableName)); + + // Get a lost of EventHub connection string from the DM cluster using KustoClient + //KustoResults eventHubSourceSettings = kustoClient.execute(CslCommandGenerator.generateDmEventHubSourceSettingsShowCommand()); + + // Ingest from a single blob + //String blobPath = ""; + + String blob1 = ""; + String blob2 = ""; + String blob3 = ""; + + KustoIngestionProperties ingestionProperties = new KustoIngestionProperties(databaseName, tableName, (long) 0); + //kustoClient.ingestFromSingleBlob(blobPath, false, ingestionProperties); + + RunVelostrataIngest(kustoClient, ingestionProperties); + + //kustoClient.ingestFromMultipleBlobs(new ArrayList(Arrays.asList(blob1,blob2,blob3)), false, ingestionProperties); + //System.out.print("DONE !!!"); + + /* + Future task = kustoClient.ingestFromMultipleBlobsAsync(new ArrayList(Arrays.asList(blob1,blob2,blob3)),false, ingestionProperties); + + System.out.print("task done = " + task.isDone()); + Object res = task.get(); + System.out.print("task done = " + task.isDone()); + System.out.print("DONE !!!"); + */ + + } catch (IOException e) { + e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private static void RunVelostrataIngest(KustoIngestClient kustoIngestClient, KustoIngestionProperties ingestionProperties) { + String containerUri = ""; + + try { + URI uri = new URI(containerUri); + CloudBlobContainer container = new CloudBlobContainer(uri); + + ArrayList blobs = new ArrayList<>(); + + + CloudBlobDirectory directory = container.getDirectoryReference("ES"); + Iterable blobItems = directory.listBlobs(); + + + StorageCredentialsSharedAccessSignature signature = (StorageCredentialsSharedAccessSignature)container.getServiceClient().getCredentials(); + for (ListBlobItem blobItem: blobItems) { + blobs.add(blobItem.getUri().toString() + "?" + signature.getToken()); + } + + ingestionProperties.setJsonMappingName("LogsMapping1"); + ingestionProperties.setFlushImmediately(true); + + kustoIngestClient.ingestFromMultipleBlobsPaths(blobs, false, ingestionProperties); + + + System.out.print("DONE !!!"); + } + catch (Exception e) + { + e.printStackTrace(); + } + + } + + private final static SharedAccessBlobPolicy createSharedAccessPolicy(EnumSet sap, int expireTimeInSeconds) { + + Calendar cal = new GregorianCalendar(TimeZone.getTimeZone("UTC")); + cal.setTime(new Date()); + cal.add(Calendar.SECOND, expireTimeInSeconds); + SharedAccessBlobPolicy policy = new SharedAccessBlobPolicy(); + policy.setPermissions(sap); + policy.setSharedAccessExpiryTime(cal.getTime()); + return policy; + + } +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/SampleFileIngestion.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/SampleFileIngestion.java new file mode 100644 index 0000000..dabc30a --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/SampleFileIngestion.java @@ -0,0 +1,31 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +public class SampleFileIngestion { + + private static final String appId = ""; + private static final String appKey = ""; + + public static void main(String[] args) { + try { + String kustoClusterPath = "https://ingest-.kusto.windows.net/"; + String filePath = ""; + String dbName = ""; + String tableName = ""; + String dataMappingName = ""; + String dataFormat = "json"; + + KustoConnectionStringBuilder kcsb = KustoConnectionStringBuilder.createWithAadApplicationCredentials(kustoClusterPath, appId, appKey); + KustoIngestionProperties ingestionProperties = new KustoIngestionProperties(dbName, tableName, (long) 0); + ingestionProperties.setJsonMappingName(dataMappingName); + + KustoIngestClient client = new KustoIngestClient(kcsb); + + for (int i = 1; i < 11; i++) { + client.ingestFromSingleFile(filePath + i + ".json", ingestionProperties); + } + + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/TableReportKustoIngestionResult.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/TableReportKustoIngestionResult.java new file mode 100644 index 0000000..9a961c6 --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/TableReportKustoIngestionResult.java @@ -0,0 +1,33 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.LinkedList; +import java.util.List; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.table.CloudTable; +import com.microsoft.azure.storage.table.TableOperation; + +public class TableReportKustoIngestionResult implements IKustoIngestionResult { + + private List m_descriptors; + + public TableReportKustoIngestionResult(List descriptors) { + m_descriptors = descriptors; + } + + @Override + public List GetIngestionStatusCollection() throws StorageException, URISyntaxException { + List results = new LinkedList<>(); + for (IngestionStatusInTableDescription descriptor : m_descriptors) { + CloudTable table = new CloudTable(new URI(descriptor.TableConnectionString)); + TableOperation operation = TableOperation.retrieve(descriptor.PartitionKey, descriptor.RowKey, + IngestionStatus.class); + results.add(table.execute(operation).getResultAsType()); + } + + return results; + } + +} diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/Utils.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/Utils.java new file mode 100644 index 0000000..6c5a34a --- /dev/null +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/client/Utils.java @@ -0,0 +1,85 @@ +package com.microsoft.azure.kusto.kafka.connect.sink.client; + +import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoClientExceptions.KustoWebException; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.StatusLine; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.json.JSONArray; +import org.json.JSONObject; + +import java.util.ArrayList; +import java.util.HashMap; + +public class Utils { + public static boolean isNullOrEmpty(String str) { + return str == null || str.isEmpty(); + } + + public static KustoResults post(String url, String aadAccessToken, String payload) throws Exception { + HttpClient httpClient = HttpClients.createDefault(); + HttpPost httpPost = new HttpPost(url); + + // Request parameters and other properties. + StringEntity requestEntity = new StringEntity( + payload, + ContentType.APPLICATION_JSON); + + + httpPost.setEntity(requestEntity); + + httpPost.addHeader("Authorization", String.format("Bearer %s", aadAccessToken)); + httpPost.addHeader("Content-Type", "application/json"); + httpPost.addHeader("Accept-Encoding", "gzip,deflate"); + httpPost.addHeader("Fed", "True"); + httpPost.addHeader("x-ms-client-version", "Kusto.Java.Client"); + + //Execute and get the response. + HttpResponse response = httpClient.execute(httpPost); + HttpEntity entity = response.getEntity(); + + if (entity != null) { + + StatusLine statusLine = response.getStatusLine(); + String responseContent = EntityUtils.toString(entity); + + if (statusLine.getStatusCode() == 200) { + + JSONObject jsonObject = new JSONObject(responseContent); + JSONArray tablesArray = jsonObject.getJSONArray("Tables"); + JSONObject table0 = tablesArray.getJSONObject(0); + JSONArray resultsColumns = table0.getJSONArray("Columns"); + + HashMap columnNameToIndex = new HashMap(); + HashMap columnNameToType = new HashMap(); + for (int i = 0; i < resultsColumns.length(); i++) { + JSONObject column = resultsColumns.getJSONObject(i); + String columnName = column.getString("ColumnName"); + columnNameToIndex.put(columnName, i); + columnNameToType.put(columnName, column.getString("DataType")); + } + + JSONArray resultsRows = table0.getJSONArray("Rows"); + ArrayList> values = new ArrayList>(); + for (int i = 0; i < resultsRows.length(); i++) { + JSONArray row = resultsRows.getJSONArray(i); + ArrayList rowVector = new ArrayList(); + for (int j = 0; j < row.length(); j++) { + rowVector.add(row.getString(j)); + } + values.add(rowVector); + } + + return new KustoResults(columnNameToIndex, columnNameToType, values); + } else { + throw new KustoWebException(responseContent, response); + } + } + return null; + } +} diff --git a/src/main/resources/azure-kusto-kafka-sink-version.properties b/src/main/resources/azure-kusto-kafka-sink-version.properties new file mode 100644 index 0000000..e5683df --- /dev/null +++ b/src/main/resources/azure-kusto-kafka-sink-version.properties @@ -0,0 +1 @@ +version=${project.version} \ No newline at end of file diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/GZIPFileWriterTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/GZIPFileWriterTest.java new file mode 100644 index 0000000..c74d1a5 --- /dev/null +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/GZIPFileWriterTest.java @@ -0,0 +1,117 @@ +package com.microsoft.azure.kusto.kafka.connect.sink; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.testng.Assert; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; + + +public class GZIPFileWriterTest { + File currentDirectory; + + @Before + public final void before() { + currentDirectory = new File(Paths.get( + System.getProperty("java.io.tmpdir"), + GZIPFileWriter.class.getSimpleName(), + String.valueOf(Instant.now().toEpochMilli()) + ).toString()); + } + + @After + public final void after() { + try { + FileUtils.deleteDirectory(currentDirectory); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Test + public void testOpen() throws IOException { + String path = Paths.get(currentDirectory.getPath(), "testWriterOpen").toString(); + + File folder = new File(path); + folder.mkdirs(); + + Assert.assertEquals(folder.listFiles().length, 0); + + HashMap files = new HashMap(); + + final String FILE_PATH = Paths.get(path, "ABC").toString(); + final int MAX_FILE_SIZE = 128; + + Consumer trackFiles = (GZIPFileDescriptor f) -> { + files.put(f.path, f.rawBytes); + }; + + Supplier generateFileName = () -> FILE_PATH; + + GZIPFileWriter gzipFileWriter = new GZIPFileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName); + + gzipFileWriter.openFile(); + + Assert.assertEquals(folder.listFiles().length, 1); + Assert.assertEquals(gzipFileWriter.currentFile.rawBytes, 0); + Assert.assertEquals(gzipFileWriter.currentFile.path, FILE_PATH + ".gz"); + Assert.assertTrue(gzipFileWriter.currentFile.file.canWrite()); + + gzipFileWriter.rollback(); + } + + @Test + public void testGzipFileWriter() throws IOException { + String path = Paths.get(currentDirectory.getPath(), "testGzipFileWriter").toString(); + + File folder = new File(path); + folder.mkdirs(); + + Assert.assertEquals(folder.listFiles().length, 0); + + HashMap files = new HashMap(); + + final int MAX_FILE_SIZE = 128; + + Consumer trackFiles = (GZIPFileDescriptor f) -> { + files.put(f.path, f.rawBytes); + }; + + + Supplier generateFileName = () -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString(); + + GZIPFileWriter gzipFileWriter = new GZIPFileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName); + + for (int i = 0; i < 9; i++) { + String msg = String.format("Line number %d : This is a message from the other size", i); + gzipFileWriter.write(msg.getBytes("UTF-8")); + } + + Assert.assertEquals(files.size(), 4); + + // should still have 1 open file at this point... + Assert.assertEquals(folder.listFiles().length, 1); + + // close current file + gzipFileWriter.close(); + Assert.assertEquals(files.size(), 5); + + List sortedFiles = new ArrayList(files.values()); + sortedFiles.sort((Long x, Long y) -> (int) (y - x)); + Assert.assertEquals(sortedFiles, Arrays.asList((long) 106, (long) 106, (long) 106, (long) 106, (long) 53)); + + // make sure folder is clear once done + Assert.assertEquals(folder.listFiles().length, 0); + } +} diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTaskTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTaskTest.java new file mode 100644 index 0000000..2d31ed0 --- /dev/null +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTaskTest.java @@ -0,0 +1,300 @@ +package com.microsoft.azure.kusto.kafka.connect.sink; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class KustoSinkTaskTest { + File currentDirectory; + + @Before + public final void before() { + currentDirectory = new File(Paths.get( + System.getProperty("java.io.tmpdir"), + GZIPFileWriter.class.getName(), + String.valueOf(Instant.now().toEpochMilli()) + ).toString()); + } + + @After + public final void after() { + currentDirectory.delete(); + } + + @Test + public void testSinkTaskOpen() throws Exception { + HashMap props = new HashMap<>(); + props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net"); + props.put(KustoSinkConfig.KUSTO_DB, "db1"); + + props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "topic1:table1;topic2:table2;"); + props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME, "test@test.com"); + props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD, "123456!"); + + KustoSinkTask kustoSinkTask = new KustoSinkTask(); + kustoSinkTask.start(props); + ArrayList tps = new ArrayList<>(); + tps.add(new TopicPartition("topic1", 1)); + tps.add(new TopicPartition("topic1", 2)); + tps.add(new TopicPartition("topic2", 1)); + + kustoSinkTask.open(tps); + + assertEquals(kustoSinkTask.writers.size(), 3); + } + + @Test + public void testSinkTaskPutRecord() throws Exception { + HashMap props = new HashMap<>(); + props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net"); + props.put(KustoSinkConfig.KUSTO_DB, "db1"); + props.put(KustoSinkConfig.KUSTO_SINK_TEMPDIR, System.getProperty("java.io.tmpdir")); + props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "topic1:table1;topic2:table2;"); + props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME, "test@test.com"); + props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD, "123456!"); + + KustoSinkTask kustoSinkTask = new KustoSinkTask(); + kustoSinkTask.start(props); + + ArrayList tps = new ArrayList<>(); + TopicPartition tp = new TopicPartition("topic1", 1); + tps.add(tp); + + kustoSinkTask.open(tps); + + List records = new ArrayList(); + + records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "stringy message".getBytes(StandardCharsets.UTF_8), 10)); + + kustoSinkTask.put(records); + + assertEquals(kustoSinkTask.writers.get(tp).currentOffset, 10); + } + + @Test + public void testSinkTaskPutRecordMissingPartition() throws Exception { + HashMap props = new HashMap<>(); + props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net"); + props.put(KustoSinkConfig.KUSTO_DB, "db1"); + + props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "topic1:table1;topic2:table2;"); + props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME, "test@test.com"); + props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD, "123456!"); + + KustoSinkTask kustoSinkTask = new KustoSinkTask(); + kustoSinkTask.start(props); + + ArrayList tps = new ArrayList<>(); + tps.add(new TopicPartition("topic1", 1)); + + kustoSinkTask.open(tps); + + List records = new ArrayList(); + + records.add(new SinkRecord("topic2", 1, null, null, null, "stringy message".getBytes(StandardCharsets.UTF_8), 10)); + + Throwable exception = assertThrows(ConnectException.class, () -> kustoSinkTask.put(records)); + + assertEquals(exception.getMessage(), "Received a record without a mapped writer for topic:partition(topic2:1), dropping record."); + + } + + @Test + public void getTopicsToTablesSingleValue() { + KustoSinkConfig mockedSinkConfig = mock(KustoSinkConfig.class); + + when(mockedSinkConfig.getKustoTable()).thenReturn("table1"); + + Map actual = KustoSinkTask.getTopicsToTables(mockedSinkConfig); + + Assert.assertEquals(actual.size(), 1); + Assert.assertEquals(actual.get(KustoSinkTask.TOPICS_WILDCARD), "table1"); + } + + @Test + public void getTopicsToTablesActualMapping() { + KustoSinkConfig mockedSinkConfig = mock(KustoSinkConfig.class); + + when(mockedSinkConfig.getKustoTopicToTableMapping()).thenReturn("topic1:table1;topic2:table2;"); + + Map actual = KustoSinkTask.getTopicsToTables(mockedSinkConfig); + + Assert.assertEquals(actual.size(), 2); + Assert.assertEquals(actual.get(KustoSinkTask.TOPICS_WILDCARD), null); + Assert.assertEquals(actual.get("topic1"), "table1"); + Assert.assertEquals(actual.get("topic2"), "table2"); + Assert.assertEquals(actual.get("topic3"), null); + } + + @Test + public void sinkStartMissingUrlOrDbOrTables() { + HashMap props = new HashMap<>(); + KustoSinkTask kustoSinkTask = new KustoSinkTask(); + + { + Throwable exception = assertThrows(ConnectException.class, () -> { + kustoSinkTask.start(props); + }); + + assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Missing required configuration \"kusto.url\" which has no default value."); + + } + + props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net"); + + { + Throwable exception = assertThrows(ConnectException.class, () -> { + kustoSinkTask.start(props); + }); + + assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Missing required configuration \"kusto.db\" which has no default value."); + } + + props.put(KustoSinkConfig.KUSTO_DB, "db1"); + + { + Throwable exception = assertThrows(ConnectException.class, () -> { + kustoSinkTask.start(props); + }); + + assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Kusto table mapping must be provided."); + } + + props.put(KustoSinkConfig.KUSTO_TABLE, "table3"); + { + Throwable exception = assertThrows(ConnectException.class, () -> { + kustoSinkTask.start(props); + }); + + assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Kusto authentication method must be provided."); + + } + + props.remove(KustoSinkConfig.KUSTO_TABLE); + props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "topic1:table1;topic2:table2;"); + { + Throwable exception = assertThrows(ConnectException.class, () -> { + kustoSinkTask.start(props); + }); + + assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Kusto authentication method must be provided."); + } + + // check malformed table mapping throws properly + props.remove(KustoSinkConfig.KUSTO_TABLES_MAPPING); + props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "topic1"); + { + Throwable exception = assertThrows(ConnectException.class, () -> { + kustoSinkTask.start(props); + }); + + assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Provided table mapping is malformed. please make sure table mapping is of 'topicName:tableName;' format."); + } + } + + @Test + public void sinkStartMissingAuth() { + HashMap props = new HashMap<>(); + props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net"); + props.put(KustoSinkConfig.KUSTO_DB, "db1"); + props.put(KustoSinkConfig.KUSTO_TABLE, "table3"); + props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "topic1:table1;topic2:table2;"); + + KustoSinkTask kustoSinkTask = new KustoSinkTask(); + + { + Throwable exception = assertThrows(ConnectException.class, () -> { + kustoSinkTask.start(props); + }); + + assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Kusto authentication method must be provided."); + + } + + props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME, "test@test.com"); + + { + Throwable exception = assertThrows(ConnectException.class, () -> { + kustoSinkTask.start(props); + }); + + assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Kusto authentication missing Password."); + + } + + props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD, "123456!"); + + { + // should not throw any errors + kustoSinkTask.start(props); + assertNotNull(kustoSinkTask.kustoIngestClient); + } + + props.remove(KustoSinkConfig.KUSTO_AUTH_USERNAME); + props.remove(KustoSinkConfig.KUSTO_AUTH_PASSWORD); + + props.put(KustoSinkConfig.KUSTO_AUTH_APPID, "appid"); + + { + Throwable exception = assertThrows(ConnectException.class, () -> { + kustoSinkTask.start(props); + }); + + assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Kusto authentication missing App Key."); + + } + + props.put(KustoSinkConfig.KUSTO_AUTH_APPKEY, "appkey"); + + { + // should not throw any errors + kustoSinkTask.start(props); + assertNotNull(kustoSinkTask.kustoIngestClient); + } + } + + @Test + public void getTable() { + HashMap props = new HashMap<>(); + props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net"); + props.put(KustoSinkConfig.KUSTO_DB, "db1"); + + props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "topic1:table1;topic2:table2;"); + props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME, "test@test.com"); + props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD, "123456!"); + + KustoSinkTask kustoSinkTask = new KustoSinkTask(); + kustoSinkTask.start(props); + { + // single table mapping should cause all topics to be mapped to a single table + Assert.assertEquals(kustoSinkTask.getTable("topic1"), "table1"); + Assert.assertEquals(kustoSinkTask.getTable("topic2"), "table2"); + Assert.assertEquals(kustoSinkTask.getTable("topic3"), null); + } + + // assert that single table takes precedence over mapping + props.put(KustoSinkConfig.KUSTO_TABLE, "table3"); + { + kustoSinkTask.start(props); + Assert.assertEquals(kustoSinkTask.getTable("topic3"), "table3"); + } + } +} diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriterTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriterTest.java new file mode 100644 index 0000000..3a31f2c --- /dev/null +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriterTest.java @@ -0,0 +1,210 @@ +package com.microsoft.azure.kusto.kafka.connect.sink; + +import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoIngestClient; +import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoIngestionProperties; +import org.apache.commons.io.FileUtils; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.testng.Assert; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.*; + +public class TopicPartitionWriterTest { + // TODO: should probably find a better way to mock internal class (GZIPFileWriter)... + File currentDirectory; + + @Before + public final void before() { + currentDirectory = new File(Paths.get( + System.getProperty("java.io.tmpdir"), + GZIPFileWriter.class.getSimpleName(), + String.valueOf(Instant.now().toEpochMilli()) + ).toString()); + } + + @After + public final void after() { + try { + FileUtils.deleteDirectory(currentDirectory); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Test + public void testHandleRollfile() { + TopicPartition tp = new TopicPartition("testPartition", 11); + KustoIngestClient mockedClient = mock(KustoIngestClient.class); + String db = "testdb1"; + String table = "testtable1"; + String basePath = "somepath"; + long fileThreshold = 100; + + TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, db, table, basePath, fileThreshold); + + GZIPFileDescriptor descriptor = new GZIPFileDescriptor(); + descriptor.rawBytes = 1024; + descriptor.path = "somepath/somefile"; + + writer.handleRollFile(descriptor); + + + KustoIngestionProperties kustoIngestionProperties = new KustoIngestionProperties(db, table, descriptor.rawBytes); + ArgumentCaptor pathArgument = ArgumentCaptor.forClass(String.class); + ArgumentCaptor ingestionPropertiesArgumentCaptor = ArgumentCaptor.forClass(KustoIngestionProperties.class); + try { + verify(mockedClient, only()).ingestFromSingleFile(pathArgument.capture(), ingestionPropertiesArgumentCaptor.capture()); + } catch (Exception e) { + e.printStackTrace(); + } + + Assert.assertEquals(pathArgument.getValue(), descriptor.path); + Assert.assertEquals(table, ingestionPropertiesArgumentCaptor.getValue().getTableName()); + Assert.assertEquals(db, ingestionPropertiesArgumentCaptor.getValue().getDatabaseName()); + Assert.assertTrue(ingestionPropertiesArgumentCaptor.getValue().getFileSize().equals((long) 1024)); + } + + @Test + public void testGetFilename() { + TopicPartition tp = new TopicPartition("testTopic", 11); + KustoIngestClient mockClient = mock(KustoIngestClient.class); + String db = "testdb1"; + String table = "testtable1"; + String basePath = "somepath"; + long fileThreshold = 100; + + TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, db, table, basePath, fileThreshold); + + Assert.assertEquals(writer.getFilePath(), Paths.get(basePath, "kafka_testTopic_11_0").toString()); + } + + @Test + public void testGetFilenameAfterOffsetChanges() { + TopicPartition tp = new TopicPartition("testTopic", 11); + KustoIngestClient mockClient = mock(KustoIngestClient.class); + String db = "testdb1"; + String table = "testtable1"; + String basePath = "somepath"; + long fileThreshold = 100; + + TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, db, table, basePath, fileThreshold); + writer.open(); + List records = new ArrayList(); + + records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "another,stringy,message", 3)); + records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}", 4)); + + for (SinkRecord record : records) { + writer.writeRecord(record); + } + + Assert.assertEquals(writer.getFilePath(), Paths.get(basePath, "kafka_testTopic_11_5").toString()); + } + + @Test + public void testOpenClose() { + TopicPartition tp = new TopicPartition("testPartition", 1); + KustoIngestClient mockClient = mock(KustoIngestClient.class); + String db = "testdb1"; + String table = "testtable1"; + String basePath = "somepath"; + long fileThreshold = 100; + + TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, db, table, basePath, fileThreshold); + writer.open(); + writer.close(); + } + + @Test + public void testWriteNonStringAndOffset() throws Exception { +// TopicPartition tp = new TopicPartition("testPartition", 11); +// KustoIngestClient mockClient = mock(KustoIngestClient.class); +// String db = "testdb1"; +// String table = "testtable1"; +// String basePath = "somepath"; +// long fileThreshold = 100; +// +// TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, db, table, basePath, fileThreshold); +// +// List records = new ArrayList(); +// DummyRecord dummyRecord1 = new DummyRecord(1, "a", (long) 2); +// DummyRecord dummyRecord2 = new DummyRecord(2, "b", (long) 4); +// +// records.add(new SinkRecord("topic", 1, null, null, null, dummyRecord1, 10)); +// records.add(new SinkRecord("topic", 2, null, null, null, dummyRecord2, 3)); +// records.add(new SinkRecord("topic", 2, null, null, null, dummyRecord2, 4)); +// +// for (SinkRecord record : records) { +// writer.writeRecord(record); +// } +// +// Assert.assertEquals(writer.getFilePath(), "kafka_testPartition_11_0"); + } + + @Test + public void testWriteStringyValuesAndOffset() throws Exception { + TopicPartition tp = new TopicPartition("testTopic", 2); + KustoIngestClient mockClient = mock(KustoIngestClient.class); + String db = "testdb1"; + String table = "testtable1"; + String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString(); + long fileThreshold = 100; + + TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, db, table, basePath, fileThreshold); + + + writer.open(); + List records = new ArrayList(); + + records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "another,stringy,message", 3)); + records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}", 4)); + + for (SinkRecord record : records) { + writer.writeRecord(record); + } + + Assert.assertEquals(writer.gzipFileWriter.currentFile.path, Paths.get(basePath, String.format("kafka_%s_%d_%d.gz", tp.topic(), tp.partition(), 3)).toString()); + } + + @Test + public void testWriteBytesValuesAndOffset() throws Exception { + TopicPartition tp = new TopicPartition("testPartition", 11); + KustoIngestClient mockClient = mock(KustoIngestClient.class); + String db = "testdb1"; + String table = "testtable1"; + String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString(); + long fileThreshold = 50; + + TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, db, table, basePath, fileThreshold); + + writer.open(); + List records = new ArrayList(); + + records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "stringy message".getBytes(StandardCharsets.UTF_8), 10)); + records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "another,stringy,message".getBytes(StandardCharsets.UTF_8), 13)); + records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}".getBytes(StandardCharsets.UTF_8), 14)); + records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}".getBytes(StandardCharsets.UTF_8), 15)); + records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}".getBytes(StandardCharsets.UTF_8), 16)); + records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}".getBytes(StandardCharsets.UTF_8), 17)); + + for (SinkRecord record : records) { + writer.writeRecord(record); + } + + //TODO : file threshold ignored? + Assert.assertTrue(writer.lastCommitedOffset.equals((long) 16)); + Assert.assertEquals(writer.gzipFileWriter.currentFile.path, Paths.get(basePath, String.format("kafka_%s_%d_%d.gz", tp.topic(), tp.partition(), 17)).toString()); + } +}