initial draft with todo's to next release

This commit is contained in:
Daniel Dubovski 2018-09-02 18:00:41 +03:00
Родитель ade02e768a
Коммит 1ddad12e05
38 изменённых файлов: 3323 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,23 @@
connector.class=com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector
# Name of the connector
name=azure-kusto-sink
# Maximum number of tasks that should be created for this connector.
tasks.max=1
topics=logs
key.ignore=true
schema.ignore=true
# Kafka topics to which the messages should be written to be sent to the Kusto
#kusto.url=https://{cluster_name}.kusto.windows.net
#kusto.db={db_name}
#kusto.table={table_name}
#kusto.tables.topics_mapping=logs:logs_table;
# 10MB
#kusto.sink.flush_size=1024*1024*10
# 10 Minutes
#kusto.sink.flush_interval_ms=60*10
#kusto.auth.username={my_username}
#kusto.auth.password={my_password}
# appid/secret
#kusto.auth.appid={my_app_id}
#kusto.auth.appkey={my_app_key}
#kusto.auth.authority=

132
pom.xml Normal file
Просмотреть файл

@ -0,0 +1,132 @@
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-kusto-kafka</artifactId>
<packaging>jar</packaging>
<version>0.0.1</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
</build>
<name>Azure Kusto Kafka Sink</name>
<licenses>
<license>
<name>The MIT License (MIT)</name>
<url>http://opensource.org/licenses/MIT</url>
<distribution>repo</distribution>
</license>
</licenses>
<scm>
<url>scm:git:https://github.com/Azure/kafka-sink-azure-kusto</url>
<connection>scm:git:git://github.com/Azure/kafka-sink-azure-kusto.git</connection>
</scm>
<developers>
<developer>
<id>microsoft</id>
<name>Microsoft</name>
</developer>
</developers>
<properties>
<kafka.version>1.0.0</kafka.version>
<json.version>20090211</json.version>
<azureStorage.version>8.0.0</azureStorage.version>
<azureAdal.version>1.6.0</azureAdal.version>
<commonio.version>2.6</commonio.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.microsoft.azure/azure-storage -->
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
<version>${azureStorage.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>adal4j</artifactId>
<version>${azureAdal.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>${json.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commonio.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
</dependency>
<!--FIXME:this is here only until I map this project to the actual Kusto Java SDK-->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>2.0.2-beta</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

Просмотреть файл

@ -0,0 +1,11 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import java.io.File;
public class GZIPFileDescriptor {
long rawBytes = 0;
long zippedBytes = 0;
long numRecords = 0;
public String path;
public File file;
}

Просмотреть файл

@ -0,0 +1,132 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import java.io.*;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.zip.GZIPOutputStream;
public class GZIPFileWriter implements Closeable {
// callbacks
private Consumer<GZIPFileDescriptor> onRollCallback;
private Supplier<String> getFilePath;
public GZIPFileDescriptor currentFile;
private GZIPOutputStream gzipStream;
private String basePath;
private CountingOutputStream fileStream;
private long fileThreshold;
public GZIPFileWriter(String basePath, long fileThreshold,
Consumer<GZIPFileDescriptor> onRollCallback, Supplier<String> getFilePath) {
this.getFilePath = getFilePath;
this.basePath = basePath;
this.fileThreshold = fileThreshold;
this.onRollCallback = onRollCallback;
}
public boolean isDirty() {
return this.currentFile != null && this.currentFile.rawBytes > 0;
}
public synchronized void write(byte[] data) throws IOException {
if (data == null || data.length == 0) return;
if (currentFile == null) {
openFile();
}
if ((currentFile.rawBytes + data.length) > fileThreshold) {
rotate();
}
gzipStream.write(data);
currentFile.rawBytes += data.length;
currentFile.zippedBytes += fileStream.numBytes;
currentFile.numRecords++;
}
public void openFile() throws IOException {
GZIPFileDescriptor fileDescriptor = new GZIPFileDescriptor();
File folder = new File(basePath);
if (!folder.exists() && !folder.mkdirs()) {
throw new IOException(String.format("Failed to create new directory %s", folder.getPath()));
}
String filePath = getFilePath.get() + ".gz";
fileDescriptor.path = filePath;
File file = new File(filePath);
file.createNewFile();
FileOutputStream fos = new FileOutputStream(file);
fos.getChannel().truncate(0);
fileStream = new CountingOutputStream(fos);
gzipStream = new GZIPOutputStream(fileStream);
fileDescriptor.file = file;
currentFile = fileDescriptor;
}
private void rotate() throws IOException {
finishFile();
openFile();
}
private void finishFile() throws IOException {
gzipStream.finish();
onRollCallback.accept(currentFile);
// closing late so that the success callback will have a chance to use the file.
gzipStream.close();
currentFile.file.delete();
}
public void rollback() throws IOException {
if (gzipStream != null) {
gzipStream.close();
if (currentFile != null && currentFile.file != null) {
currentFile.file.delete();
}
}
}
public void close() throws IOException {
// Flush last file, updating index
finishFile();
gzipStream.close();
}
private class CountingOutputStream extends FilterOutputStream {
private long numBytes = 0;
CountingOutputStream(OutputStream out) throws IOException {
super(out);
}
@Override
public void write(int b) throws IOException {
out.write(b);
this.numBytes++;
}
@Override
public void write(byte[] b) throws IOException {
out.write(b);
this.numBytes += b.length;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
this.numBytes += len;
}
}
}

Просмотреть файл

@ -0,0 +1,105 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class KustoSinkConfig extends AbstractConfig {
// TODO: this might need to be per kusto cluster...
static final String KUSTO_URL = "kusto.url";
static final String KUSTO_DB = "kusto.db";
static final String KUSTO_TABLE = "kusto.table";
static final String KUSTO_TABLES_MAPPING = "kusto.tables.topics_mapping";
static final String KUSTO_AUTH_USERNAME = "kusto.auth.username";
static final String KUSTO_AUTH_PASSWORD = "kusto.auth.password";
static final String KUSTO_AUTH_APPID = "kusto.auth.appid";
static final String KUSTO_AUTH_APPKEY = "kusto.auth.appkey";
static final String KUSTO_AUTH_AUTHORITY = "kusto.auth.authority";
static final String KUSTO_SINK_TEMPDIR = "kusto.sink.tempdir";
static final String KUSTO_SINK_FLUSH_SIZE = "kusto.sink.flush_size";
static final String KUSTO_SINK_FLUSH_INTERVAL_MS = "kusto.sink.flush_interval_ms";
public KustoSinkConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
}
public KustoSinkConfig(Map<String, String> parsedConfig) {
this(getConfig(), parsedConfig);
}
public static ConfigDef getConfig() {
return new ConfigDef()
.define(KUSTO_URL, Type.STRING, Importance.HIGH, "Kusto cluster url")
.define(KUSTO_DB, Type.STRING, Importance.HIGH, "Kusto target database name")
.define(KUSTO_TABLE, Type.STRING, null, Importance.HIGH, "Kusto target table (if a per topic mapping is required, use `kusto.tables.topic_mapping` instead)")
.define(KUSTO_TABLES_MAPPING, Type.STRING, null, Importance.HIGH, "Kusto target tables mapping (per topic mapping, 'topic1:table1;topic2:table2;')")
.define(KUSTO_AUTH_USERNAME, Type.STRING, null, Importance.HIGH, "Kusto auth using username,password combo: username")
.define(KUSTO_AUTH_PASSWORD, Type.STRING, null, Importance.HIGH, "Kusto auth using username,password combo: password")
.define(KUSTO_AUTH_APPID, Type.STRING, null, Importance.HIGH, "Kusto auth using appid,appkey combo: app id")
.define(KUSTO_AUTH_APPKEY, Type.STRING, null, Importance.HIGH, "Kusto auth using appid,appkey combo: app key")
.define(KUSTO_AUTH_AUTHORITY, Type.STRING, null, Importance.HIGH, "Kusto auth using appid,appkey combo: authority")
.define(KUSTO_SINK_TEMPDIR, Type.STRING, System.getProperty("java.io.tempdir"), Importance.LOW, "Temp dir that will be used by kusto sink to buffer records. defaults to system temp dir")
.define(KUSTO_SINK_FLUSH_SIZE, Type.LONG, FileUtils.ONE_MB, Importance.HIGH, "Kusto sink max buffer size (per topic+partition combo)")
.define(KUSTO_SINK_FLUSH_INTERVAL_MS, Type.LONG, TimeUnit.MINUTES.toMillis(5), Importance.HIGH, "Kusto sink max staleness in milliseconds (per topic+partition combo)");
}
public String getKustoUrl() {
return this.getString(KUSTO_URL);
}
public String getKustoDb() {
return this.getString(KUSTO_DB);
}
public String getKustoTable() {
return this.getString(KUSTO_TABLE);
}
public String getKustoTopicToTableMapping() {
return this.getString(KUSTO_TABLES_MAPPING);
}
public String getKustoAuthUsername() {
return this.getString(KUSTO_AUTH_USERNAME);
}
public String getKustoAuthPassword() {
return this.getString(KUSTO_AUTH_PASSWORD);
}
public String getKustoAuthAppid() {
return this.getString(KUSTO_AUTH_APPID);
}
public String getKustoAuthAppkey() {
return this.getString(KUSTO_AUTH_APPKEY);
}
public String getKustoAuthAuthority() {
return this.getString(KUSTO_AUTH_AUTHORITY);
}
public long getKustoFlushSize() {
return this.getLong(KUSTO_SINK_FLUSH_SIZE);
}
public String getKustoSinkTempDir() {
return this.getString(KUSTO_SINK_TEMPDIR);
}
public long getKustoFlushIntervalMS() {
return this.getInt(KUSTO_SINK_FLUSH_INTERVAL_MS);
}
}

Просмотреть файл

@ -0,0 +1,65 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
class KustoSinkConnector extends SinkConnector {
private static final Logger log = LoggerFactory.getLogger(KustoSinkConnector.class);
private Map<String, String> configProps;
KustoSinkConnector(KustoSinkConfig config) {
}
@Override
public String version() {
return Version.getVersion();
}
@Override
public Class<? extends Task> taskClass() {
return KustoSinkTask.class;
}
@Override
public void start(Map<String, String> props) throws ConnectException {
try {
configProps = new HashMap<>(props);
} catch (ConfigException e) {
throw new ConnectException("Couldn't start KustoSinkConnector due to configuration error", e);
}
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
Map<String, String> taskProps = new HashMap<>(configProps);
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
taskConfigs.add(taskProps);
}
return taskConfigs;
}
@Override
public void stop() throws ConnectException {
log.info("Shutting down KustoSinkConnector");
}
@Override
public ConfigDef config() {
return KustoSinkConfig.getConfig();
}
}

Просмотреть файл

@ -0,0 +1,207 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
//import com.microsoft.azure.kusto.kafka.connect.source.JsonSerialization
// FIXME: need to consume this package via maven once setup properly
//import com.microsoft.azure.sdk.kusto.ingest.KustoIngestClient;
import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoConnectionStringBuilder;
import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoIngestClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class KustoSinkTask extends SinkTask {
static final String TOPICS_WILDCARD = "*";
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
private final Set<TopicPartition> assignment;
Map<String, String> topicsToTables;
KustoIngestClient kustoIngestClient;
String databaseName;
Map<TopicPartition, TopicPartitionWriter> writers;
private Long maxFileSize;
private String tempDir;
public KustoSinkTask() {
assignment = new HashSet<>();
writers = new HashMap<>();
}
public static KustoIngestClient createKustoIngestClient(KustoSinkConfig config) throws Exception {
if (config.getKustoAuthAppid() != null) {
if (config.getKustoAuthAppkey() == null) {
throw new ConfigException("Kusto authentication missing App Key.");
}
return new KustoIngestClient(KustoConnectionStringBuilder.createWithAadApplicationCredentials(
config.getKustoUrl(),
//todo: should replace with proper initialization
config.getKustoAuthAppid(),
config.getKustoAuthAppkey(),
config.getKustoAuthAuthority()
));
}
if (config.getKustoAuthUsername() != null) {
if (config.getKustoAuthPassword() == null) {
throw new ConfigException("Kusto authentication missing Password.");
}
return new KustoIngestClient(KustoConnectionStringBuilder.createWithAadUserCredentials(
config.getKustoUrl(),
//todo: should replace with proper initialization
config.getKustoAuthUsername(),
config.getKustoAuthPassword()
));
}
throw new ConfigException("Kusto authentication method must be provided.");
}
public static Map<String, String> getTopicsToTables(KustoSinkConfig config) {
Map<String, String> result = new HashMap<>();
if (config.getKustoTable() != null) {
result.put(TOPICS_WILDCARD, config.getKustoTable());
return result;
}
if (config.getKustoTopicToTableMapping() != null) {
String[] mappings = config.getKustoTopicToTableMapping().split(";");
for (String mapping : mappings) {
String[] kvp = mapping.split(":");
if (kvp.length != 2) {
throw new ConfigException("Provided table mapping is malformed. please make sure table mapping is of 'topicName:tableName;' format.");
}
result.put(kvp[0], kvp[1]);
}
return result;
}
throw new ConfigException("Kusto table mapping must be provided.");
}
public String getTable(String topic) {
if (topicsToTables.containsKey(TOPICS_WILDCARD)) {
return topicsToTables.get(TOPICS_WILDCARD);
}
return topicsToTables.get(topic);
}
@Override
public String version() {
return Version.getVersion();
}
@Override
public void open(Collection<TopicPartition> partitions) throws ConnectException {
assignment.addAll(partitions);
for (TopicPartition tp : assignment) {
String table = getTable(tp.topic());
if (table == null) {
throw new ConnectException(String.format("Kusto Sink has no table mapped for the topic: %s. please check your configuration.", tp.topic()));
} else {
TopicPartitionWriter writer = new TopicPartitionWriter(tp, kustoIngestClient, databaseName, table, tempDir, maxFileSize);
writer.open();
writers.put(tp, writer);
}
}
}
@Override
public void close(Collection<TopicPartition> partitions) {
for (TopicPartition tp : assignment) {
try {
writers.get(tp).close();
} catch (ConnectException e) {
log.error("Error closing writer for {}. Error: {}", tp, e.getMessage());
}
}
writers.clear();
assignment.clear();
}
@Override
public void start(Map<String, String> props) throws ConnectException {
try {
KustoSinkConfig config = new KustoSinkConfig(props);
databaseName = config.getKustoDb();
topicsToTables = getTopicsToTables(config);
// this should be read properly from settings
kustoIngestClient = createKustoIngestClient(config);
tempDir = config.getKustoSinkTempDir();
maxFileSize = config.getKustoFlushSize();
open(context.assignment());
} catch (ConfigException ex) {
throw new ConnectException(String.format("Kusto Connector failed to start due to configuration error. %s", ex.getMessage()), ex);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void stop() throws ConnectException {
for (TopicPartitionWriter writer : writers.values()) {
writer.close();
}
}
@Override
public void put(Collection<SinkRecord> records) throws ConnectException {
for (SinkRecord record : records) {
TopicPartition tp = new TopicPartition(record.topic(), record.kafkaPartition());
TopicPartitionWriter writer = writers.get(tp);
if (writer == null) {
throw new NotFoundException(String.format("Received a record without a mapped writer for topic:partition(%s:%d), dropping record.", tp.topic(), tp.partition()));
}
writer.writeRecord(record);
}
}
// this is a neat trick, since our rolling files commit whenever they like, offsets may drift
// from what kafka expects. so basically this is to re-sync topic-partition offests with our sink.
@Override
public Map<TopicPartition, OffsetAndMetadata> preCommit(
Map<TopicPartition, OffsetAndMetadata> offsets
) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition tp : assignment) {
Long offset = writers.get(tp).lastCommitedOffset;
if (offset != null) {
log.trace("Forwarding to framework request to commit offset: {} for {}", offset, tp);
offsetsToCommit.put(tp, new OffsetAndMetadata(offset));
}
}
return offsetsToCommit;
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) throws ConnectException {
// do nothing , rolling files can handle writing
}
}

Просмотреть файл

@ -0,0 +1,94 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoIngestClient;
import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoIngestionProperties;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
public class TopicPartitionWriter {
GZIPFileWriter gzipFileWriter;
TopicPartition tp;
KustoIngestClient client;
String databse;
String table;
String basePath;
long fileThreshold;
long currentOffset;
Long lastCommitedOffset;
TopicPartitionWriter(
TopicPartition tp, KustoIngestClient client,
String database, String table,
String basePath, long fileThreshold
) {
this.tp = tp;
this.client = client;
this.table = table;
this.fileThreshold = fileThreshold;
this.databse = database;
this.basePath = basePath;
this.currentOffset = 0;
}
public void handleRollFile(GZIPFileDescriptor fileDescriptor) {
KustoIngestionProperties properties = new KustoIngestionProperties(databse, table, fileDescriptor.rawBytes);
try {
client.ingestFromSingleFile(fileDescriptor.path, properties);
lastCommitedOffset = currentOffset;
} catch (Exception e) {
e.printStackTrace();
}
}
public String getFilePath() {
long nextOffset = gzipFileWriter != null && gzipFileWriter.isDirty() ? currentOffset + 1 : currentOffset;
return Paths.get(basePath, String.format("kafka_%s_%s_%d", tp.topic(), tp.partition(), nextOffset)).toString();
}
public void writeRecord(SinkRecord record) {
byte[] value = new byte[0];
// todo: should probably handle more schemas
if (record.valueSchema() == null || record.valueSchema() == Schema.STRING_SCHEMA) {
value = record.value().toString().getBytes(StandardCharsets.UTF_8);
} else if (record.valueSchema() == Schema.BYTES_SCHEMA) {
value = (byte[]) record.value();
} else {
try {
throw new Exception("Unexpected value type, can only handle strings");
} catch (Exception e) {
e.printStackTrace();
}
}
try {
currentOffset = record.kafkaOffset();
gzipFileWriter.write(value);
} catch (IOException e) {
e.printStackTrace();
}
}
public void open() {
gzipFileWriter = new GZIPFileWriter(basePath, fileThreshold, this::handleRollFile, this::getFilePath);
}
public void close() {
try {
gzipFileWriter.rollback();
} catch (IOException e) {
e.printStackTrace();
}
}
}

Просмотреть файл

@ -0,0 +1,29 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.util.Properties;
public class Version {
private static final Logger log = LoggerFactory.getLogger(Version.class);
private static final String VERSION_FILE = "/azure-kusto-kafka-sink-version.properties";
private static String version = "unknown";
static {
try {
Properties props = new Properties();
try (InputStream versionFileStream = Version.class.getResourceAsStream(VERSION_FILE)) {
props.load(versionFileStream);
version = props.getProperty("version", version).trim();
}
} catch (Exception e) {
log.warn("Error while loading version:", e);
}
}
public static String getVersion() {
return version;
}
}

Просмотреть файл

@ -0,0 +1,88 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
import com.microsoft.aad.adal4j.*;
import javax.naming.ServiceUnavailableException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class AadAuthenticationHelper {
private final static String c_microsoftAadTenantId = "72f988bf-86f1-41af-91ab-2d7cd011db47";
private final static String c_kustoClientId = "ad30ae9e-ac1b-4249-8817-d24f5d7ad3de";
private ClientCredential m_clientCredential;
private String m_userUsername;
private String m_userPassword;
private String m_clusterUrl;
private String m_aadAuthorityId;
private String m_aadAuthorityUri;
public static String getMicrosoftAadAuthorityId() { return c_microsoftAadTenantId; }
public AadAuthenticationHelper(KustoConnectionStringBuilder kcsb) throws Exception{
m_clusterUrl = kcsb.getClusterUrl();
if (!Utils.isNullOrEmpty(kcsb.getApplicationClientId()) && !Utils.isNullOrEmpty(kcsb.getApplicationKey())) {
m_clientCredential = new ClientCredential(kcsb.getApplicationClientId(), kcsb.getApplicationKey());
} else {
m_userUsername = kcsb.getUserUsername();
m_userPassword = kcsb.getUserPassword();
}
// Set the AAD Authority URI
m_aadAuthorityId = (kcsb.getAuthorityId() == null ? c_microsoftAadTenantId : kcsb.getAuthorityId());
m_aadAuthorityUri = "https://login.microsoftonline.com/" + m_aadAuthorityId + "/oauth2/authorize";
}
public String acquireAccessToken() throws Exception {
if (m_clientCredential != null){
return acquireAadApplicationAccessToken().getAccessToken();
} else {
return acquireAadUserAccessToken().getAccessToken();
}
}
private AuthenticationResult acquireAadUserAccessToken() throws Exception {
AuthenticationContext context = null;
AuthenticationResult result = null;
ExecutorService service = null;
try {
service = Executors.newFixedThreadPool(1);
context = new AuthenticationContext(m_aadAuthorityUri, true, service);
Future<AuthenticationResult> future = context.acquireToken(
m_clusterUrl, c_kustoClientId, m_userUsername, m_userPassword,
null);
result = future.get();
} finally {
service.shutdown();
}
if (result == null) {
throw new ServiceUnavailableException("acquireAadUserAccessToken got 'null' authentication result");
}
return result;
}
private AuthenticationResult acquireAadApplicationAccessToken() throws Exception {
AuthenticationContext context = null;
AuthenticationResult result = null;
ExecutorService service = null;
try {
service = Executors.newFixedThreadPool(1);
context = new AuthenticationContext(m_aadAuthorityUri, true, service);
Future<AuthenticationResult> future = context.acquireToken(m_clusterUrl, m_clientCredential, null);
result = future.get();
} finally {
service.shutdown();
}
if (result == null) {
throw new ServiceUnavailableException("acquireAadApplicationAccessToken got 'null' authentication result");
}
return result;
}
}

Просмотреть файл

@ -0,0 +1,144 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobOutputStream;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueClient;
import com.microsoft.azure.storage.queue.CloudQueueMessage;
import com.microsoft.azure.storage.table.CloudTable;
import com.microsoft.azure.storage.table.TableOperation;
import com.microsoft.azure.storage.table.TableServiceEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.InvalidKeyException;
import java.util.zip.GZIPOutputStream;
public class AzureStorageHelper {
private static final Logger log = LoggerFactory.getLogger(AzureStorageHelper.class);
private static final int GZIP_BUFFER_SIZE = 16384;
public static void postMessageToQueue(String queuePath, String content) throws Exception {
try
{
CloudQueue queue = new CloudQueue(new URI(queuePath));
CloudQueueMessage queueMessage = new CloudQueueMessage(content);
queue.addMessage(queueMessage);
}
catch (Exception e)
{
log.error(String.format("postMessageToQueue: %s.",e.getMessage()), e);
throw e;
}
}
private static void postMessageStorageAccountNotWorkingYet(String content) throws URISyntaxException, InvalidKeyException, StorageException {
// Retrieve storage account from connection-string.
CloudStorageAccount storageAccount = CloudStorageAccount.parse("");
// Create the queue client.
CloudQueueClient queueClient = storageAccount.createCloudQueueClient();
// Retrieve a reference to a queue.
CloudQueue queue = queueClient.getQueueReference("myqueue");
// Create the queue if it doesn't already exist.
queue.createIfNotExists();
// Create a message and add it to the queue.
CloudQueueMessage message = new CloudQueueMessage(content);
queue.addMessage(message);
}
public static void postMessageToQueue2(String queueName, String content, int invisibleTimeInSeconds) throws Exception {
try {
final String storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=6txkstrldaaoa01;AccountKey=****;EndpointSuffix=core.windows.net";
final CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
final CloudQueueClient queueClient = storageAccount.createCloudQueueClient();
CloudQueue queue = queueClient.getQueueReference("readyforaggregation");
CloudQueueMessage cloudQueueMessage = new CloudQueueMessage("{\"Id\":\"bfe6ae6a-3582-42fd-b871-f2749cda62d7\",\"BlobPath\":\"https://6txkstrldaaoa01.blob.core.windows.net/8up-20171106-temp-e5c334ee145d4b43a3a2d3a96fbac1df/41ef7c03-851d-461e-91eb-34fc91e6a789?sig=gLnw2ZXjh8D3hitwfQuVRcOz7QCMm3S3msLLKRmyTvY%3D&st=2017-11-07T05%3A08%3A05Z&se=2017-11-09T05%3A08%3A05Z&sv=2017-04-17&si=DownloadPolicy&sp=rwd&sr=b\",\"RawDataSize\":1645,\"DatabaseName\":\"AdobeAnalytics\",\"TableName\":\"RonylTest\",\"RetainBlobOnSuccess\":false,\"Format\":\"tsv\",\"FlushImmediately\":false,\"ReportLevel\":0}");
queue.addMessage(cloudQueueMessage);
} catch (Exception e) {
// Output the stack trace.
e.printStackTrace();
}
}
public static void azureTableInsertEntity(String tableUri, TableServiceEntity entity) throws StorageException, URISyntaxException {
CloudTable table = new CloudTable(new URI(tableUri));
// Create an operation to add the new customer to the table basics table.
TableOperation insert = TableOperation.insert(entity);
// Submit the operation to the table service.
table.execute(insert);
}
public static CloudBlockBlob uploadLocalFileToBlob(String filePath, String blobName, String storageUri) throws Exception{
try {
log.debug(String.format("uploadLocalFileToBlob: filePath: %s, blobName: %s, storageUri: %s", filePath, blobName, storageUri));
// Check if the file is already compressed:
boolean isCompressed = filePath.endsWith(".gz") || filePath.endsWith(".zip");
CloudBlobContainer container = new CloudBlobContainer(new URI(storageUri));
File sourceFile = new File(filePath);
//Getting a blob reference
CloudBlockBlob blob;
if(!isCompressed)
{
blob = container.getBlockBlobReference(blobName+".gz");
InputStream fin = Files.newInputStream(Paths.get(filePath));
BlobOutputStream bos = blob.openOutputStream();
GZIPOutputStream gzout = new GZIPOutputStream(bos);
byte[] buffer = new byte[GZIP_BUFFER_SIZE];
int length;
while ((length = fin.read(buffer)) > 0) {
gzout.write(buffer, 0, length);
}
gzout.close();
fin.close();
} else {
blob = container.getBlockBlobReference(blobName);
blob.uploadFromFile(sourceFile.getAbsolutePath());
}
return blob;
}
catch (StorageException se)
{
log.error(String.format("uploadLocalFileToBlob: Error returned from the service. Http code: %d and error code: %s", se.getHttpStatusCode(), se.getErrorCode()), se);
throw se;
}
catch (Exception ex)
{
log.error(String.format("uploadLocalFileToBlob: Error while uploading file to blob."), ex);
throw ex;
}
}
public static String getBlobPathWithSas(CloudBlockBlob blob) {
StorageCredentialsSharedAccessSignature signature = (StorageCredentialsSharedAccessSignature)blob.getServiceClient().getCredentials();
return blob.getStorageUri().getPrimaryUri().toString() + "?" + signature.getToken();
}
}

Просмотреть файл

@ -0,0 +1,49 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
import java.util.UUID;
public class BlobDescription {
private String m_blobPath;
private Long m_blobSize;
private UUID m_sourceId;
public String getBlobPath()
{
return m_blobPath;
}
public void setBlobPath(String blobPath)
{
m_blobPath = blobPath;
}
public Long getBlobSize()
{
return m_blobSize;
}
public void setBlobSize(Long blobSize)
{
m_blobSize = blobSize;
}
public UUID getSourceId()
{
return m_sourceId;
}
public void setSourceId(UUID sourceId)
{
m_sourceId = sourceId;
}
public BlobDescription()
{
}
public BlobDescription(String blobPath, Long blobSize)
{
m_blobPath = blobPath;
m_blobSize = blobSize;
}
}

Просмотреть файл

@ -0,0 +1,19 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
public class CslCommandGenerator {
public static String generateDmEventHubSourceSettingsShowCommand()
{
String command = ".show EventHub ingestion sources settings";
return command;
}
public static String generateIngestionResourcesShowCommand() {
String command = ".show ingestion resources";
return command;
}
public static String generateKustoIdentityGetCommand() {
String command = ".get kusto identity token";
return command;
}
}

Просмотреть файл

@ -0,0 +1,14 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
import java.net.URISyntaxException;
import java.util.List;
import com.microsoft.azure.storage.StorageException;
public interface IKustoIngestionResult {
/// <summary>
/// Retrieves the detailed ingestion status of
/// all data ingestion operations into Kusto associated with this IKustoIngestionResult instance.
/// </summary>
List<IngestionStatus> GetIngestionStatusCollection() throws StorageException, URISyntaxException;
}

Просмотреть файл

@ -0,0 +1,79 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoClientExceptions.KustoClientAggregateException;
import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoClientExceptions.KustoClientException;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.codehaus.jackson.map.ObjectMapper;
import java.net.URI;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
public class IngestFromMultipleBlobsCallable implements Callable<Object> {
private final String m_ingestionQueueUri;
private List<String> m_blobPaths;
private Boolean m_deleteSourceOnSuccess;
private KustoIngestionProperties m_ingestionProperties;
public IngestFromMultipleBlobsCallable(List<String> blobPaths, Boolean deleteSourceOnSuccess, KustoIngestionProperties ingestionProperties, final String ingestionQueueUri) {
m_blobPaths = blobPaths;
m_deleteSourceOnSuccess = deleteSourceOnSuccess;
m_ingestionProperties = ingestionProperties;
m_ingestionQueueUri = ingestionQueueUri;
}
@Override
public Object call() throws Exception {
if (m_blobPaths == null || m_blobPaths.size() == 0) {
throw new KustoClientException("blobs must have at least 1 path");
}
List<KustoClientException> ingestionErrors = new LinkedList<KustoClientException>();
for (String blobPath : m_blobPaths) {
try {
// Create the ingestion message
IngestionBlobInfo ingestionBlobInfo = new IngestionBlobInfo(blobPath, m_ingestionProperties.getDatabaseName(), m_ingestionProperties.getTableName());
ingestionBlobInfo.rawDataSize = estimateBlobRawSize(blobPath);
ingestionBlobInfo.retainBlobOnSuccess = !m_deleteSourceOnSuccess;
ingestionBlobInfo.reportLevel = m_ingestionProperties.getReportLevel();
ingestionBlobInfo.reportMethod = m_ingestionProperties.getReportMethod();
ingestionBlobInfo.flushImmediately = m_ingestionProperties.getFlushImmediately();
ingestionBlobInfo.additionalProperties = m_ingestionProperties.getAdditionalProperties();
ObjectMapper objectMapper = new ObjectMapper();
String serializedIngestionBlobInfo = objectMapper.writeValueAsString(ingestionBlobInfo);
AzureStorageHelper.postMessageToQueue(m_ingestionQueueUri, serializedIngestionBlobInfo);
} catch (Exception ex) {
ingestionErrors.add(new KustoClientException(blobPath, "fail to post message to queue", ex));
}
}
if (ingestionErrors.size() > 0) {
throw new KustoClientAggregateException(ingestionErrors);
}
return null;
}
private Long estimateBlobRawSize(String blobPath) throws Exception {
try {
CloudBlockBlob blockBlob = new CloudBlockBlob(new URI(blobPath));
blockBlob.downloadAttributes();
long length = blockBlob.getProperties().getLength();
if (length == 0) {
return null;
}
return length;
} catch (Exception e) {
return null;
}
}
}

Просмотреть файл

@ -0,0 +1,30 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
import java.util.Map;
import java.util.UUID;
final public class IngestionBlobInfo {
public String blobPath;
public Long rawDataSize;
public String databaseName;
public String tableName;
public UUID id;
public Boolean retainBlobOnSuccess;
public KustoIngestionProperties.IngestionReportLevel reportLevel;
public KustoIngestionProperties.IngestionReportMethod reportMethod;
public Boolean flushImmediately;
public IngestionStatusInTableDescription IngestionStatusInTable;
public Map<String, String> additionalProperties;
public IngestionBlobInfo(String blobPath, String databaseName, String tableName) {
this.blobPath = blobPath;
this.databaseName = databaseName;
this.tableName = tableName;
id = UUID.randomUUID();
retainBlobOnSuccess = false;
flushImmediately = false;
reportLevel = KustoIngestionProperties.IngestionReportLevel.FailuresOnly;
reportMethod = KustoIngestionProperties.IngestionReportMethod.Queue;
}
}

Просмотреть файл

@ -0,0 +1,218 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
public enum IngestionErrorCode {
/// <summary>
/// Unknown error occurred
/// </summary>
Unknown,
/// <summary>
/// Low memory condition.
/// </summary>
Stream_LowMemoryCondition,
/// <summary>
/// Wrong number of fields.
/// </summary>
Stream_WrongNumberOfFields,
/// <summary>
/// Input stream/record/field too large.
/// </summary>
Stream_InputStreamTooLarge,
/// <summary>
/// No data streams to ingest
/// </summary>
Stream_NoDataToIngest,
/// <summary>
/// Invalid csv format - closing quote missing.
/// </summary>
Stream_ClosingQuoteMissing,
/// <summary>
/// Failed to download source from Azure storage - source not found
/// </summary>
Download_SourceNotFound,
/// <summary>
/// Failed to download source from Azure storage - access condition not satisfied
/// </summary>
Download_AccessConditionNotSatisfied,
/// <summary>
/// Failed to download source from Azure storage - access forbidden
/// </summary>
Download_Forbidden,
/// <summary>
/// Failed to download source from Azure storage - account not found
/// </summary>
Download_AccountNotFound,
/// <summary>
/// Failed to download source from Azure storage - bad request
/// </summary>
Download_BadRequest,
/// <summary>
/// Failed to download source from Azure storage - not transient error
/// </summary>
Download_NotTransient,
/// <summary>
/// Failed to download source from Azure storage - unknown error
/// </summary>
Download_UnknownError,
/// <summary>
/// Failed to invoke update policy. Query schema does not match table schema
/// </summary>
UpdatePolicy_QuerySchemaDoesNotMatchTableSchema,
/// <summary>
/// Failed to invoke update policy. Failed descendant transactional update policy
/// </summary>
UpdatePolicy_FailedDescendantTransaction,
/// <summary>
/// Failed to invoke update policy. Ingestion Error occurred
/// </summary>
UpdatePolicy_IngestionError,
/// <summary>
/// Failed to invoke update policy. Unknown error occurred
/// </summary>
UpdatePolicy_UnknownError,
/// <summary>
/// Json pattern was not ingested with jsonMapping parameter
/// </summary>
BadRequest_MissingJsonMappingtFailure,
/// <summary>
/// Blob is invalid or empty zip archive
/// </summary>
BadRequest_InvalidOrEmptyBlob,
/// <summary>
/// Database does not exist
/// </summary>
BadRequest_DatabaseNotExist,
/// <summary>
/// Table does not exist
/// </summary>
BadRequest_TableNotExist,
/// <summary>
/// Invalid kusto identity token
/// </summary>
BadRequest_InvalidKustoIdentityToken,
/// <summary>
/// Blob path without SAS from unknown blob storage
/// </summary>
BadRequest_UriMissingSas,
/// <summary>
/// File too large
/// </summary>
BadRequest_FileTooLarge,
/// <summary>
/// No valid reply from ingest command
/// </summary>
BadRequest_NoValidResponseFromEngine,
/// <summary>
/// Access to table is denied
/// </summary>
BadRequest_TableAccessDenied,
/// <summary>
/// Message is exhausted
/// </summary>
BadRequest_MessageExhausted,
/// <summary>
/// Bad request
/// </summary>
General_BadRequest,
/// <summary>
/// Internal server error occurred
/// </summary>
General_InternalServerError,
/// <summary>
/// Failed to invoke update policy. Cyclic update is not allowed
/// </summary>
UpdatePolicy_Cyclic_Update_Not_Allowed,
/// <summary>
/// Failed to invoke update policy. Transactional update policy is not allowed in streaming ingestion
/// </summary>
UpdatePolicy_Transactional_Not_Allowed_In_Streaming_Ingestion,
/// <summary>
/// Failed to parse csv mapping.
/// </summary>
BadRequest_InvalidCsvMapping,
/// <summary>
/// Invalid mapping reference.
/// </summary>
BadRequest_InvalidMappingReference,
/// <summary>
/// Mapping reference wasn't found.
/// </summary>
BadRequest_MappingReferenceWasNotFound,
/// <summary>
/// Failed to parse json mapping.
/// </summary>
BadRequest_InvalidJsonMapping,
/// <summary>
/// Format is not supported
/// </summary>
BadRequest_FormatNotSupported,
/// <summary>
/// Ingestion properties contains ingestion mapping and ingestion mapping reference.
/// </summary>
BadRequest_DuplicateMapping,
/// <summary>
/// Message is corrupted
/// </summary>
BadRequest_CorruptedMessage,
/// <summary>
/// Inconsistent ingestion mapping
/// </summary>
BadRequest_InconsistentMapping,
/// <summary>
/// Syntax error
/// </summary>
BadRequest_SyntaxError,
/// <summary>
/// Abandoned ingestion.
/// </summary>
General_AbandonedIngestion,
/// <summary>
/// Throttled ingestion.
/// </summary>
General_ThrottledIngestion,
/// <summary>
/// Schema of target table at start time doesn't match the one at commit time.
/// </summary>
General_TransientSchemaMismatch,
}

Просмотреть файл

@ -0,0 +1,35 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;//import org.joda.time.DateTime;
import java.util.Date;
import java.util.UUID;
public class IngestionFailureInfo {
public UUID OperationId;
public String Database;
public String Table;
public Date FailedOn;
public UUID IngestionSourceId;
public String IngestionSourcePath;
public String Details;
public FailureStatusValue FailureStatus;
public UUID RootActivityId;
public Boolean OriginatesFromUpdatePolicy;
public enum FailureStatusValue {
Unknown(0),
Permanent(1),
Transient(2),
Exhausted(3);
private final int value;
FailureStatusValue(int v) {
value = v;
}
public int getValue() {
return value;
}
}
}

Просмотреть файл

@ -0,0 +1,185 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
import java.time.Instant;
import java.util.Date;
import java.util.UUID;
import com.microsoft.azure.storage.table.TableServiceEntity;
/// <summary>
/// This class represents an ingestion status.
/// </summary>
/// <remarks>
/// Any change to this class must be made in a backwards/forwards-compatible manner.
/// </remarks>
public class IngestionStatus extends TableServiceEntity {
/// <summary>
/// The updated status of the ingestion. The ingestion status will be 'Pending'
/// during the ingestion's process
/// and will be updated as soon as the ingestion completes.
/// </summary>
public OperationStatus status;
public String getStatus() {
return status.toString();
}
public void setStatus(String s) {
status = OperationStatus.valueOf(s);
}
/// <summary>
/// A unique identifier representing the ingested source. Can be supplied during
/// the ingestion execution.
/// </summary>
public UUID ingestionSourceId;
public UUID getIngestionSourceId() {
return ingestionSourceId;
}
public void setIngestionSourceId(UUID id) {
ingestionSourceId = id;
}
/// <summary>
/// The URI of the blob, potentially including the secret needed to access
/// the blob. This can be a file system URI (on-premises deployments only),
/// or an Azure Blob Storage URI (including a SAS key or a semicolon followed
/// by the account key)
/// </summary>
public String ingestionSourcePath;
public String getIngestionSourcePath() {
return ingestionSourcePath;
}
public void setIngestionSourcePath(String path) {
ingestionSourcePath = path;
}
/// <summary>
/// The name of the database holding the target table.
/// </summary>
public String database;
public String getDatabase() {
return database;
}
public void setDatabase(String db) {
database = db;
}
/// <summary>
/// The name of the target table into which the data will be ingested.
/// </summary>
public String table;
public String getTable() {
return table;
}
public void setTable(String t) {
table = t;
}
/// <summary>
/// The last updated time of the ingestion status.
/// </summary>
public Date updatedOn;
public Date getUpdatedOn() {
return updatedOn;
}
public void setUpdatedOn(Date lastUpdated) {
updatedOn = lastUpdated;
}
/// <summary>
/// The ingestion's operation Id.
/// </summary>
public UUID operationId;
public UUID getOperationId() {
return operationId;
}
public void setOperationId(UUID id) {
operationId = id;
}
/// <summary>
/// The ingestion's activity Id.
/// </summary>
public UUID activityId;
public UUID getActivityId() {
return activityId;
}
public void setActivityId(UUID id) {
activityId = id;
}
/// <summary>
/// In case of a failure - indicates the failure's error code.
/// </summary>
public IngestionErrorCode errorCode;
public String getErrorCode() {
return (errorCode != null ? errorCode : IngestionErrorCode.Unknown).toString();
}
public void setErrorCode(String code) {
errorCode = code == null ? IngestionErrorCode.Unknown : IngestionErrorCode.valueOf(code);
}
/// <summary>
/// In case of a failure - indicates the failure's status.
/// </summary>
public IngestionFailureInfo.FailureStatusValue failureStatus;
public String getFailureStatus() {
return (failureStatus != null ? failureStatus : IngestionFailureInfo.FailureStatusValue.Unknown).toString();
}
public void setFailureStatus(String status) {
failureStatus = IngestionFailureInfo.FailureStatusValue.valueOf(status);
}
/// <summary>
/// In case of a failure - indicates the failure's details.
/// </summary>
public String details;
public String getDetails() {
return details;
}
public void setDetails(String d) {
details = d;
}
/// <summary>
/// In case of a failure - indicates whether or not the failures originate from
/// an Update Policy.
/// </summary>
public boolean originatesFromUpdatePolicy;
public boolean getOriginatesFromUpdatePolicy() {
return originatesFromUpdatePolicy;
}
public void setOriginatesFromUpdatePolicy(boolean fromUpdatePolicy) {
originatesFromUpdatePolicy = fromUpdatePolicy;
}
public IngestionStatus() {
}
public IngestionStatus(UUID uuid) {
super(uuid.toString(), uuid.toString());
}
}

Просмотреть файл

@ -0,0 +1,7 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
public class IngestionStatusInTableDescription {
public String TableConnectionString;
public String PartitionKey;
public String RowKey;
}

Просмотреть файл

@ -0,0 +1,42 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
import org.json.JSONObject;
public class KustoClient {
private final String c_adminCommandsPrefix = ".";
private final String c_apiVersion = "v1";
private final String c_defaultDatabaseName = "NetDefaultDb";
private AadAuthenticationHelper m_aadAuthenticationHelper;
private String m_clusterUrl;
public KustoClient(KustoConnectionStringBuilder kcsb) throws Exception {
m_clusterUrl = kcsb.getClusterUrl();
m_aadAuthenticationHelper = new AadAuthenticationHelper(kcsb);
}
public KustoResults execute(String command) throws Exception {
return execute(c_defaultDatabaseName, command);
}
public KustoResults execute(String database, String command) throws Exception {
String clusterEndpoint;
if (command.startsWith(c_adminCommandsPrefix)) {
clusterEndpoint = String.format("%s/%s/rest/mgmt", m_clusterUrl, c_apiVersion);
} else {
clusterEndpoint = String.format("%s/%s/rest/query", m_clusterUrl, c_apiVersion);
}
return execute(database, command, clusterEndpoint);
}
private KustoResults execute(String database, String command, String clusterEndpoint) throws Exception {
String aadAccessToken = m_aadAuthenticationHelper.acquireAccessToken();
String jsonString = new JSONObject()
.put("db", database)
.put("csl", command).toString();
return Utils.post(clusterEndpoint, aadAccessToken, jsonString);
}
}

Просмотреть файл

@ -0,0 +1,15 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client.KustoClientExceptions;
import java.util.List;
public class KustoClientAggregateException extends Exception{
List<KustoClientException> m_kustoClientExceptions;
public List<KustoClientException> getExceptions() { return m_kustoClientExceptions; }
public KustoClientAggregateException(List<KustoClientException> kustoClientExceptions)
{
m_kustoClientExceptions = kustoClientExceptions;
}
}

Просмотреть файл

@ -0,0 +1,20 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client.KustoClientExceptions;
public class KustoClientException extends Exception {
private String m_ingestionSource;
public String getIngestionSource() { return m_ingestionSource; }
public KustoClientException(String message) {
super(message);
}
public KustoClientException(String message, Exception exception) {
super(message, exception);
}
public KustoClientException(String ingestionSource, String message, Exception exception) {
super(message, exception);
m_ingestionSource = ingestionSource;
}
}

Просмотреть файл

@ -0,0 +1,18 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client.KustoClientExceptions;
import org.apache.http.HttpResponse;
public class KustoWebException extends Exception{
private String m_message;
private HttpResponse m_httpResponse;
public String getMessage() { return m_message; }
public HttpResponse getHttpResponse() { return m_httpResponse; }
public KustoWebException(String message, HttpResponse httpResponse) {
m_message = message;
m_httpResponse = httpResponse;
}
}

Просмотреть файл

@ -0,0 +1,66 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
public class KustoConnectionStringBuilder {
private String m_clusterUri;
private String m_username;
private String m_password;
private String m_applicationClientId;
private String m_applicationKey;
private String m_aadAuthorityId; // AAD tenant Id (GUID)
public String getClusterUrl() { return m_clusterUri; }
public String getUserUsername() { return m_username; }
public String getUserPassword() { return m_password; }
public String getApplicationClientId() { return m_applicationClientId; }
public String getApplicationKey() { return m_applicationKey; }
public String getAuthorityId() { return m_aadAuthorityId; }
private KustoConnectionStringBuilder(String resourceUri)
{
m_clusterUri = resourceUri;
m_username = null;
m_password = null;
m_applicationClientId = null;
m_applicationKey = null;
m_aadAuthorityId = null;
}
public static KustoConnectionStringBuilder createWithAadUserCredentials(String resourceUri,
String username,
String password,
String authorityId)
{
KustoConnectionStringBuilder kcsb = new KustoConnectionStringBuilder(resourceUri);
kcsb.m_username = username;
kcsb.m_password = password;
kcsb.m_aadAuthorityId = authorityId;
return kcsb;
}
public static KustoConnectionStringBuilder createWithAadUserCredentials(String resourceUri,
String username,
String password)
{
return createWithAadUserCredentials(resourceUri, username, password, null);
}
public static KustoConnectionStringBuilder createWithAadApplicationCredentials(String resourceUri,
String applicationClientId,
String applicationKey,
String authorityId)
{
KustoConnectionStringBuilder kcsb = new KustoConnectionStringBuilder(resourceUri);
kcsb.m_applicationClientId = applicationClientId;
kcsb.m_applicationKey = applicationKey;
kcsb.m_aadAuthorityId = authorityId;
return kcsb;
}
public static KustoConnectionStringBuilder createWithAadApplicationCredentials(String resourceUri,
String applicationClientId,
String applicationKey)
{
return createWithAadApplicationCredentials(resourceUri, applicationClientId, applicationKey, null);
}
}

Просмотреть файл

@ -0,0 +1,222 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoClientExceptions.KustoClientAggregateException;
import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoClientExceptions.KustoClientException;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueMessage;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.URI;
import java.sql.Date;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
public class KustoIngestClient {
public static final int COMPRESSED_FILE_MULTIPLIER = 11;
public final Logger log = LoggerFactory.getLogger(KustoIngestClient.class);
public KustoClient m_kustoClient;
public ResourceManager m_resourceManager;
public KustoIngestClient(KustoConnectionStringBuilder kcsb) throws Exception {
log.info("Creating a new KustoIngestClient");
m_kustoClient = new KustoClient(kcsb);
m_resourceManager = new ResourceManager(m_kustoClient);
}
/*
* public Future ingestFromMultipleBlobsPaths(List<String> blobPaths, Boolean
* deleteSourceOnSuccess, KustoIngestionProperties ingestionProperties){
*
* ExecutorService executorService = Executors.newSingleThreadExecutor();
*
* return executorService.submit(new IngestFromMultipleBlobsCallable(blobPaths,
* deleteSourceOnSuccess, ingestionProperties, c_ingestionQueueUri)); }
*/
public IKustoIngestionResult ingestFromMultipleBlobsPaths(List<String> blobPaths, Boolean deleteSourceOnSuccess,
KustoIngestionProperties ingestionProperties) throws Exception {
// ingestFromMultipleBlobsAsync(blobPaths, deleteSourceOnSuccess,
// ingestionProperties).get();
List<BlobDescription> blobDescriptions = blobPaths.stream().map(b -> new BlobDescription(b, null))
.collect(Collectors.toList());
return ingestFromMultipleBlobsImpl(blobDescriptions, deleteSourceOnSuccess, ingestionProperties);
}
/*
* public Future ingestFromSingleBlob(String blobPath, Boolean deleteSourceOnSuccess,
KustoIngestionProperties ingestionProperties, Long rawDataSize){
*
* ExecutorService executorService = Executors.newSingleThreadExecutor();
*
* return executorService.submit(new IngestFromMultipleBlobsCallable(blobPaths,
* deleteSourceOnSuccess, ingestionProperties, c_ingestionQueueUri)); }
*/
public IKustoIngestionResult ingestFromSingleBlob(String blobPath, Boolean deleteSourceOnSuccess,
KustoIngestionProperties ingestionProperties, Long rawDataSize) throws Exception {
BlobDescription blobDescription = new BlobDescription(blobPath, rawDataSize);
return ingestFromMultipleBlobsImpl(new ArrayList(Arrays.asList(blobDescription)), deleteSourceOnSuccess, ingestionProperties);
}
public IKustoIngestionResult ingestFromMultipleBlobs(List<BlobDescription> blobDescriptions,
Boolean deleteSourceOnSuccess, KustoIngestionProperties ingestionProperties) throws Exception {
return ingestFromMultipleBlobsImpl(blobDescriptions, deleteSourceOnSuccess, ingestionProperties);
}
public IKustoIngestionResult ingestFromMultipleBlobsImpl(List<BlobDescription> blobDescriptions,
Boolean deleteSourceOnSuccess, KustoIngestionProperties ingestionProperties) throws Exception {
// ingestFromMultipleBlobsAsync(blobPaths, deleteSourceOnSuccess,
// ingestionProperties).get();
if (blobDescriptions == null || blobDescriptions.size() == 0) {
throw new KustoClientException("blobs must have at least 1 path");
}
ingestionProperties.setAuthorizationContextToken(m_resourceManager.getKustoIdentityToken());
List<KustoClientException> ingestionErrors = new LinkedList();
List<IngestionStatusInTableDescription> tableStatuses = new LinkedList<>();
for (BlobDescription blobDescription : blobDescriptions) {
try {
// Create the ingestion message
IngestionBlobInfo ingestionBlobInfo = new IngestionBlobInfo(blobDescription.getBlobPath(),
ingestionProperties.getDatabaseName(), ingestionProperties.getTableName());
ingestionBlobInfo.rawDataSize = blobDescription.getBlobSize() != null ? blobDescription.getBlobSize()
: estimateBlobRawSize(blobDescription);
ingestionBlobInfo.retainBlobOnSuccess = !deleteSourceOnSuccess;
ingestionBlobInfo.reportLevel = ingestionProperties.getReportLevel();
ingestionBlobInfo.reportMethod = ingestionProperties.getReportMethod();
ingestionBlobInfo.flushImmediately = ingestionProperties.getFlushImmediately();
ingestionBlobInfo.additionalProperties = ingestionProperties.getAdditionalProperties();
if (blobDescription.getSourceId() != null) {
ingestionBlobInfo.id = blobDescription.getSourceId();
}
if (ingestionProperties.getReportMethod() != KustoIngestionProperties.IngestionReportMethod.Queue) {
String tableStatusUri = GetTableStatus();
ingestionBlobInfo.IngestionStatusInTable = new IngestionStatusInTableDescription();
ingestionBlobInfo.IngestionStatusInTable.TableConnectionString = tableStatusUri;
ingestionBlobInfo.IngestionStatusInTable.RowKey = ingestionBlobInfo.id.toString();
ingestionBlobInfo.IngestionStatusInTable.PartitionKey = ingestionBlobInfo.id.toString();
IngestionStatus status = new IngestionStatus(ingestionBlobInfo.id);
status.database = ingestionProperties.getDatabaseName();
status.table = ingestionProperties.getTableName();
status.status = OperationStatus.Pending;
status.updatedOn = Date.from(Instant.now());
status.ingestionSourceId = ingestionBlobInfo.id;
status.setIngestionSourcePath(blobDescription.getBlobPath());
AzureStorageHelper.azureTableInsertEntity(tableStatusUri, status);
tableStatuses.add(ingestionBlobInfo.IngestionStatusInTable);
}
ObjectMapper objectMapper = new ObjectMapper();
String serializedIngestionBlobInfo = objectMapper.writeValueAsString(ingestionBlobInfo);
AzureStorageHelper.postMessageToQueue(m_resourceManager.getAggregationQueue(), serializedIngestionBlobInfo);
} catch (Exception ex) {
ingestionErrors.add(
new KustoClientException(blobDescription.getBlobPath(), "fail to post message to queue", ex));
}
}
if (ingestionErrors.size() > 0) {
throw new KustoClientAggregateException(ingestionErrors);
}
return new TableReportKustoIngestionResult(tableStatuses);
}
public void ingestFromSingleFile(String filePath, KustoIngestionProperties ingestionProperties) throws Exception {
try {
String blobName = genBlobName(filePath, ingestionProperties.getDatabaseName(), ingestionProperties.getTableName());
CloudBlockBlob blob = AzureStorageHelper.uploadLocalFileToBlob(filePath, blobName, m_resourceManager.getStorageUri());
String blobPath = AzureStorageHelper.getBlobPathWithSas(blob);
long rawDataSize = ingestionProperties.getFileSize() != null ? ingestionProperties.getFileSize() : estimateLocalFileSize(filePath);
ingestFromSingleBlob(blobPath, true, ingestionProperties, rawDataSize);
} catch (Exception ex) {
log.error(String.format("ingestFromSingleFile: Error while uploading file (compression mode): %s. Error: %s", filePath, ex.getMessage()), ex);
throw ex;
}
}
public List<IngestionBlobInfo> GetAndDiscardTopIngestionFailures() throws Exception {
// Get ingestion queues from DM
KustoResults failedIngestionsQueues = m_kustoClient
.execute(CslCommandGenerator.generateIngestionResourcesShowCommand());
String failedIngestionsQueue = failedIngestionsQueues.getValues().get(0)
.get(failedIngestionsQueues.getIndexByColumnName("Uri"));
CloudQueue queue = new CloudQueue(new URI(failedIngestionsQueue));
Iterable<CloudQueueMessage> messages = queue.retrieveMessages(32, 5000, null, null);
return null;
}
public String GetTableStatus() throws Exception {
KustoResults result = m_kustoClient.execute(CslCommandGenerator.generateIngestionResourcesShowCommand());
return GetRandomResourceByResourceTypeName(result, "IngestionsStatusTable");
}
public String GetRandomResourceByResourceTypeName(KustoResults kustoResults, String name) {
ArrayList<String> results = new ArrayList<String>();
for (Iterator<ArrayList<String>> it = kustoResults.getValues().iterator(); it.hasNext(); ) {
ArrayList<String> next = it.next();
if (next.get(0).equals(name)) {
results.add(next.get(1));
}
}
Random randomizer = new Random();
return results.get(randomizer.nextInt(results.size()));
}
public Long estimateBlobRawSize(BlobDescription blobDescription) throws Exception {
try {
String blobPath = blobDescription.getBlobPath();
CloudBlockBlob blockBlob = new CloudBlockBlob(new URI(blobPath));
blockBlob.downloadAttributes();
long length = blockBlob.getProperties().getLength();
if (length == 0) {
return null;
}
if (blobPath.contains(".zip") || blobPath.contains(".gz")) {
length = length * COMPRESSED_FILE_MULTIPLIER;
}
return length;
} catch (Exception e) {
throw e;
}
}
public long estimateLocalFileSize(String filePath) {
File file = new File(filePath);
long fileSize = file.length();
if (filePath.endsWith(".zip") || filePath.endsWith(".gz")) {
fileSize = fileSize * COMPRESSED_FILE_MULTIPLIER;
}
return fileSize;
}
public String genBlobName(String filePath, String databaseName, String tableName) {
String fileName = (new File(filePath)).getName();
return String.format("%s__%s__%s__%s", databaseName, tableName, UUID.randomUUID().toString(), fileName);
}
}

Просмотреть файл

@ -0,0 +1,113 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
import java.util.HashMap;
import java.util.Map;
public class KustoIngestionProperties implements Comparable<KustoIngestionProperties> {
private String m_databaseName;
private String m_tableName;
private Long m_fileSize;
private boolean m_flushImmediately;
private IngestionReportLevel m_reportLevel;
private IngestionReportMethod m_reportMethod;
private Map<String, String> m_additionalProperties;
public KustoIngestionProperties(String databaseName, String tableName, Long fileSize) {
m_databaseName = databaseName;
m_tableName = tableName;
m_reportLevel = IngestionReportLevel.FailuresOnly;
m_reportMethod = IngestionReportMethod.Queue;
m_flushImmediately = false;
m_fileSize = fileSize;
m_additionalProperties = new HashMap();
}
public String getDatabaseName() {
return m_databaseName;
}
public Long getFileSize() {
return m_fileSize;
}
public String getTableName() {
return m_tableName;
}
public boolean getFlushImmediately() {
return m_flushImmediately;
}
public void setFlushImmediately(boolean flushImmediately) {
m_flushImmediately = flushImmediately;
}
public IngestionReportLevel getReportLevel() {
return m_reportLevel;
}
public void setReportLevel(IngestionReportLevel reportLevel) {
m_reportLevel = reportLevel;
}
public IngestionReportMethod getReportMethod() {
return m_reportMethod;
}
public void setReportMethod(IngestionReportMethod reportMethod) {
m_reportMethod = reportMethod;
}
public Map<String, String> getAdditionalProperties() {
return m_additionalProperties;
}
public void setDataFormat(DATA_FORMAT dataFormat) {
m_additionalProperties.put("format", dataFormat.name());
}
/**
* Sets the data format by its name. If the name does not exist, then it does not add it.
*
* @param dataFormatName
*/
public void setDataFormat(String dataFormatName) {
String dataFormat = DATA_FORMAT.valueOf(dataFormatName.toLowerCase()).name();
if (!dataFormat.isEmpty()) {
m_additionalProperties.put("format", dataFormat);
}
}
public void setJsonMappingName(String jsonMappingName) {
m_additionalProperties.put("jsonMappingReference", jsonMappingName);
m_additionalProperties.put("format", DATA_FORMAT.json.name());
}
public void setCsvMappingName(String mappingName) {
m_additionalProperties.put("csvMappingReference", mappingName);
m_additionalProperties.put("format", DATA_FORMAT.csv.name());
}
public void setAuthorizationContextToken(String token) {
m_additionalProperties.put("authorizationContext", token);
}
@Override
public int compareTo(KustoIngestionProperties o) {
return this.m_tableName == o.m_tableName && this.m_databaseName == o.m_databaseName ? 1 : -1;
}
public enum DATA_FORMAT {csv, tsv, scsv, sohsv, psv, txt, json, singlejson, avro, parquet}
public enum IngestionReportLevel {
FailuresOnly,
None,
FailuresAndSuccesses
}
public enum IngestionReportMethod {
Queue,
Table,
QueueAndTable
}
}

Просмотреть файл

@ -0,0 +1,39 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
import java.util.ArrayList;
import java.util.HashMap;
public class KustoResults {
private HashMap<String, Integer> m_columnNameToIndex;
private HashMap<String, String> m_columnNameToType;
private ArrayList<ArrayList<String>> m_values;
public KustoResults(HashMap<String, Integer> columnNameToIndex, HashMap<String, String> columnNameToType,
ArrayList<ArrayList<String>> values) {
m_columnNameToIndex = columnNameToIndex;
m_columnNameToType = columnNameToType;
m_values = values;
}
public HashMap<String, Integer> getColumnNameToIndex() {
return m_columnNameToIndex;
}
public HashMap<String, String> getColumnNameToType() {
return m_columnNameToType;
}
public Integer getIndexByColumnName(String columnName) {
return m_columnNameToIndex.get(columnName);
}
public String getTypeByColumnName(String columnName) {
return m_columnNameToType.get(columnName);
}
;
public ArrayList<ArrayList<String>> getValues() {
return m_values;
}
}

Просмотреть файл

@ -0,0 +1,39 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
/// <summary>
/// An enum representing the state of a data ingestion operation into Kusto
/// </summary>
public enum OperationStatus {
/// <summary>
/// Represents a temporary status.
/// Might change during the course of ingestion based on the
/// outcome of the data ingestion operation into Kusto.
/// </summary>
Pending,
/// <summary>
/// Represents a permanent status.
/// The data has been successfully ingested to Kusto.
/// </summary>
Succeeded,
/// <summary>
/// Represents a permanent status.
/// The data has not been ingested to Kusto.
/// </summary>
Failed,
/// <summary>
/// Represents a permanent status.
/// The data has been queued for ingestion.
/// (This does not indicate the ingestion was successful)
/// </summary>
Queued,
/// <summary>
/// Represents a permanent status.
/// No data was supplied for ingestion. The ingest operation was skipped.
/// </summary>
Skipped,
/// <summary>
/// Represents a permanent status.
/// Part of the data has been successfully ingested to Kusto while some failed.
/// </summary>
PartiallySucceeded
}

Просмотреть файл

@ -0,0 +1,178 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class ResourceManager {
// Constants:
private static final String SECURED_READY_FOR_AGGREGATION_QUEUE = "SecuredReadyForAggregationQueue";
private static final String FAILED_INGESTIONS_QUEUE = "FailedIngestionsQueue";
private static final String SUCCESSFUL_INGESTIONS_QUEUE = "SuccessfulIngestionsQueue";
private static final String TEMP_STORAGE = "TempStorage";
private static final String INGESTIONS_STATUS_TABLE = "IngestionsStatusTable";
// Ingestion Resources value lists:
private ArrayList<String> aggregationQueueList = new ArrayList<>();
private ArrayList<String> failedIngestionsQueueList = new ArrayList<>();
private ArrayList<String> successfulIngestionsQueueList = new ArrayList<>();
private ArrayList<String> tempStorageList = new ArrayList<>();
private ArrayList<String> ingestionsStatusTableList = new ArrayList<>();
//Identity Token
private String m_identityToken;
// Round-rubin indexes:
private int aggregationQueueIdx = 0;
private int tempStorageIdx = 0;
private KustoClient m_kustoClient;
private final long refreshIngestionResourcesPeriod = 1000 * 60 * 60 * 1; // 1 hour
private Timer timer = new Timer(true);
private final Logger log = LoggerFactory.getLogger(KustoIngestClient.class);
public ResourceManager(KustoClient kustoClient) {
m_kustoClient = kustoClient;
TimerTask refreshIngestionResourceValuesTask = new TimerTask() {
@Override
public void run() {
try {
refreshIngestionResources();
} catch (Exception e) {
log.error(String.format("Error in refreshIngestionResources: %s.", e.getMessage()), e);
}
}
};
TimerTask refreshIngestionAuthTokenTask = new TimerTask() {
@Override
public void run() {
try {
refreshIngestionAuthToken();
} catch (Exception e) {
log.error(String.format("Error in refreshIngestionAuthToken: %s.", e.getMessage()), e);
}
}
};
timer.schedule(refreshIngestionAuthTokenTask, 0, refreshIngestionResourcesPeriod);
timer.schedule(refreshIngestionResourceValuesTask, 0, refreshIngestionResourcesPeriod);
}
public void clean() {
aggregationQueueList = new ArrayList<>();
failedIngestionsQueueList = new ArrayList<>();
successfulIngestionsQueueList = new ArrayList<>();
tempStorageList = new ArrayList<>();
ingestionsStatusTableList = new ArrayList<>();
}
public String getKustoIdentityToken() throws Exception {
if (m_identityToken == null) {
refreshIngestionAuthToken();
if (m_identityToken == null) {
throw new Exception("Unable to get Identity token");
}
}
return m_identityToken;
}
public String getStorageUri() throws Exception {
int arrSize = tempStorageList.size();
if (arrSize == 0) {
refreshIngestionResources();
arrSize = tempStorageList.size();
if (arrSize == 0) {
throw new Exception("Unable to get temp storages list");
}
}
// Round-rubin over the values of tempStorageList:
tempStorageIdx = (tempStorageIdx + 1) % arrSize;
return tempStorageList.get(tempStorageIdx);
}
public String getAggregationQueue() throws Exception {
int arrSize = aggregationQueueList.size();
if (arrSize == 0) {
refreshIngestionResources();
arrSize = aggregationQueueList.size();
if (arrSize == 0) {
throw new Exception("Unable to get aggregation queues list");
}
}
// Round-rubin over the values of aggregationQueueList:
aggregationQueueIdx = (aggregationQueueIdx + 1) % arrSize;
return aggregationQueueList.get(aggregationQueueIdx);
}
private void addValue(String key, String value) {
switch (key) {
case SECURED_READY_FOR_AGGREGATION_QUEUE:
aggregationQueueList.add(value);
break;
case FAILED_INGESTIONS_QUEUE:
failedIngestionsQueueList.add(value);
break;
case SUCCESSFUL_INGESTIONS_QUEUE:
successfulIngestionsQueueList.add(value);
break;
case TEMP_STORAGE:
tempStorageList.add(value);
break;
case INGESTIONS_STATUS_TABLE:
ingestionsStatusTableList.add(value);
break;
default:
log.warn("Unrecognized key: %s", key);
break;
}
}
private void refreshIngestionResources() throws Exception {
log.info("Refreshing Ingestion Resources");
KustoResults ingestionResourcesResults = m_kustoClient.execute(CslCommandGenerator.generateIngestionResourcesShowCommand());
ArrayList<ArrayList<String>> values = ingestionResourcesResults.getValues();
clean();
values.forEach(pairValues -> {
String key = pairValues.get(0);
String value = pairValues.get(1);
addValue(key, value);
});
}
private void refreshIngestionAuthToken() throws Exception {
log.info("Refreshing Ingestion Auth Token");
KustoResults authToken = m_kustoClient.execute(CslCommandGenerator.generateKustoIdentityGetCommand());
m_identityToken = authToken.getValues().get(0).get(authToken.getIndexByColumnName("AuthorizationContext"));
}
public ArrayList<String> getAggregationQueueList() {
return aggregationQueueList;
}
public ArrayList<String> getFailedIngestionsQueueList() {
return failedIngestionsQueueList;
}
public ArrayList<String> getSuccessfulIngestionsQueueList() {
return successfulIngestionsQueueList;
}
public ArrayList<String> getTempStorageList() {
return tempStorageList;
}
public ArrayList<String> getIngestionsStatusTableList() {
return ingestionsStatusTableList;
}
}

Просмотреть файл

@ -0,0 +1,128 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
import com.microsoft.azure.storage.blob.*;
import java.io.IOException;
import java.net.URI;
import java.util.*;
public class Sample {
public static void main(String[] args) {
try {
// This will work with MS AAD tenant. If another tenant is required, set this variable to the right one
String authorityId = AadAuthenticationHelper.getMicrosoftAadAuthorityId();
//AzureStorageHelper.postMessageToQueue2("aaa", "aaa", 1);
//String engineClusterUrl = "https://<ClusterName>.kusto.windows.net";
String dmClusterUrl = "https://ingest-kustoppas2.kusto.windows.net";
String databaseName = "velostrata";
String tableName = "tmpLogs";
//String userUsername = "<AadUserUsername>";
//String userPassword = "<AadUserPassword>";
String applicationClientId = "";
String applicationKey = "";
//String dmClusterUrl = "https://ingest-roil.kusto.windows.net";
//String userUsername = "roil@microsoft.com";
//String userPassword = "";
// Kusto connection string that uses the given user and password during authentication
//KustoConnectionStringBuilder kcsb = KustoConnectionStringBuilder.createWithAadUserCredentials(dmClusterUrl, userUsername, userPassword, authorityId);
// Kusto connection string that uses the given application client ID and key during authentication
KustoConnectionStringBuilder kcsb =
KustoConnectionStringBuilder.createWithAadApplicationCredentials(dmClusterUrl, applicationClientId, applicationKey, authorityId);
// Create an instance of KustoClient that will be used for executing commands against the cluster
KustoIngestClient kustoClient = new KustoIngestClient(kcsb);
// Running a basic count query on the given database and table
//KustoResults results = kustoClient.execute(databaseName, String.format("%s | count", tableName));
// Get a lost of EventHub connection string from the DM cluster using KustoClient
//KustoResults eventHubSourceSettings = kustoClient.execute(CslCommandGenerator.generateDmEventHubSourceSettingsShowCommand());
// Ingest from a single blob
//String blobPath = "";
String blob1 = "";
String blob2 = "";
String blob3 = "";
KustoIngestionProperties ingestionProperties = new KustoIngestionProperties(databaseName, tableName, (long) 0);
//kustoClient.ingestFromSingleBlob(blobPath, false, ingestionProperties);
RunVelostrataIngest(kustoClient, ingestionProperties);
//kustoClient.ingestFromMultipleBlobs(new ArrayList<String>(Arrays.asList(blob1,blob2,blob3)), false, ingestionProperties);
//System.out.print("DONE !!!");
/*
Future task = kustoClient.ingestFromMultipleBlobsAsync(new ArrayList<String>(Arrays.asList(blob1,blob2,blob3)),false, ingestionProperties);
System.out.print("task done = " + task.isDone());
Object res = task.get();
System.out.print("task done = " + task.isDone());
System.out.print("DONE !!!");
*/
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
private static void RunVelostrataIngest(KustoIngestClient kustoIngestClient, KustoIngestionProperties ingestionProperties) {
String containerUri = "";
try {
URI uri = new URI(containerUri);
CloudBlobContainer container = new CloudBlobContainer(uri);
ArrayList<String> blobs = new ArrayList<>();
CloudBlobDirectory directory = container.getDirectoryReference("ES");
Iterable<ListBlobItem> blobItems = directory.listBlobs();
StorageCredentialsSharedAccessSignature signature = (StorageCredentialsSharedAccessSignature)container.getServiceClient().getCredentials();
for (ListBlobItem blobItem: blobItems) {
blobs.add(blobItem.getUri().toString() + "?" + signature.getToken());
}
ingestionProperties.setJsonMappingName("LogsMapping1");
ingestionProperties.setFlushImmediately(true);
kustoIngestClient.ingestFromMultipleBlobsPaths(blobs, false, ingestionProperties);
System.out.print("DONE !!!");
}
catch (Exception e)
{
e.printStackTrace();
}
}
private final static SharedAccessBlobPolicy createSharedAccessPolicy(EnumSet<SharedAccessBlobPermissions> sap, int expireTimeInSeconds) {
Calendar cal = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
cal.setTime(new Date());
cal.add(Calendar.SECOND, expireTimeInSeconds);
SharedAccessBlobPolicy policy = new SharedAccessBlobPolicy();
policy.setPermissions(sap);
policy.setSharedAccessExpiryTime(cal.getTime());
return policy;
}
}

Просмотреть файл

@ -0,0 +1,31 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
public class SampleFileIngestion {
private static final String appId = "";
private static final String appKey = "";
public static void main(String[] args) {
try {
String kustoClusterPath = "https://ingest-<cluster name>.kusto.windows.net/";
String filePath = "";
String dbName = "";
String tableName = "";
String dataMappingName = "";
String dataFormat = "json";
KustoConnectionStringBuilder kcsb = KustoConnectionStringBuilder.createWithAadApplicationCredentials(kustoClusterPath, appId, appKey);
KustoIngestionProperties ingestionProperties = new KustoIngestionProperties(dbName, tableName, (long) 0);
ingestionProperties.setJsonMappingName(dataMappingName);
KustoIngestClient client = new KustoIngestClient(kcsb);
for (int i = 1; i < 11; i++) {
client.ingestFromSingleFile(filePath + i + ".json", ingestionProperties);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

Просмотреть файл

@ -0,0 +1,33 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.LinkedList;
import java.util.List;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.table.CloudTable;
import com.microsoft.azure.storage.table.TableOperation;
public class TableReportKustoIngestionResult implements IKustoIngestionResult {
private List<IngestionStatusInTableDescription> m_descriptors;
public TableReportKustoIngestionResult(List<IngestionStatusInTableDescription> descriptors) {
m_descriptors = descriptors;
}
@Override
public List<IngestionStatus> GetIngestionStatusCollection() throws StorageException, URISyntaxException {
List<IngestionStatus> results = new LinkedList<>();
for (IngestionStatusInTableDescription descriptor : m_descriptors) {
CloudTable table = new CloudTable(new URI(descriptor.TableConnectionString));
TableOperation operation = TableOperation.retrieve(descriptor.PartitionKey, descriptor.RowKey,
IngestionStatus.class);
results.add(table.execute(operation).getResultAsType());
}
return results;
}
}

Просмотреть файл

@ -0,0 +1,85 @@
package com.microsoft.azure.kusto.kafka.connect.sink.client;
import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoClientExceptions.KustoWebException;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.json.JSONArray;
import org.json.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
public class Utils {
public static boolean isNullOrEmpty(String str) {
return str == null || str.isEmpty();
}
public static KustoResults post(String url, String aadAccessToken, String payload) throws Exception {
HttpClient httpClient = HttpClients.createDefault();
HttpPost httpPost = new HttpPost(url);
// Request parameters and other properties.
StringEntity requestEntity = new StringEntity(
payload,
ContentType.APPLICATION_JSON);
httpPost.setEntity(requestEntity);
httpPost.addHeader("Authorization", String.format("Bearer %s", aadAccessToken));
httpPost.addHeader("Content-Type", "application/json");
httpPost.addHeader("Accept-Encoding", "gzip,deflate");
httpPost.addHeader("Fed", "True");
httpPost.addHeader("x-ms-client-version", "Kusto.Java.Client");
//Execute and get the response.
HttpResponse response = httpClient.execute(httpPost);
HttpEntity entity = response.getEntity();
if (entity != null) {
StatusLine statusLine = response.getStatusLine();
String responseContent = EntityUtils.toString(entity);
if (statusLine.getStatusCode() == 200) {
JSONObject jsonObject = new JSONObject(responseContent);
JSONArray tablesArray = jsonObject.getJSONArray("Tables");
JSONObject table0 = tablesArray.getJSONObject(0);
JSONArray resultsColumns = table0.getJSONArray("Columns");
HashMap<String, Integer> columnNameToIndex = new HashMap<String, Integer>();
HashMap<String, String> columnNameToType = new HashMap<String, String>();
for (int i = 0; i < resultsColumns.length(); i++) {
JSONObject column = resultsColumns.getJSONObject(i);
String columnName = column.getString("ColumnName");
columnNameToIndex.put(columnName, i);
columnNameToType.put(columnName, column.getString("DataType"));
}
JSONArray resultsRows = table0.getJSONArray("Rows");
ArrayList<ArrayList<String>> values = new ArrayList<ArrayList<String>>();
for (int i = 0; i < resultsRows.length(); i++) {
JSONArray row = resultsRows.getJSONArray(i);
ArrayList<String> rowVector = new ArrayList<String>();
for (int j = 0; j < row.length(); j++) {
rowVector.add(row.getString(j));
}
values.add(rowVector);
}
return new KustoResults(columnNameToIndex, columnNameToType, values);
} else {
throw new KustoWebException(responseContent, response);
}
}
return null;
}
}

Просмотреть файл

@ -0,0 +1 @@
version=${project.version}

Просмотреть файл

@ -0,0 +1,117 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.testng.Assert;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class GZIPFileWriterTest {
File currentDirectory;
@Before
public final void before() {
currentDirectory = new File(Paths.get(
System.getProperty("java.io.tmpdir"),
GZIPFileWriter.class.getSimpleName(),
String.valueOf(Instant.now().toEpochMilli())
).toString());
}
@After
public final void after() {
try {
FileUtils.deleteDirectory(currentDirectory);
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void testOpen() throws IOException {
String path = Paths.get(currentDirectory.getPath(), "testWriterOpen").toString();
File folder = new File(path);
folder.mkdirs();
Assert.assertEquals(folder.listFiles().length, 0);
HashMap<String, Long> files = new HashMap<String, Long>();
final String FILE_PATH = Paths.get(path, "ABC").toString();
final int MAX_FILE_SIZE = 128;
Consumer<GZIPFileDescriptor> trackFiles = (GZIPFileDescriptor f) -> {
files.put(f.path, f.rawBytes);
};
Supplier<String> generateFileName = () -> FILE_PATH;
GZIPFileWriter gzipFileWriter = new GZIPFileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName);
gzipFileWriter.openFile();
Assert.assertEquals(folder.listFiles().length, 1);
Assert.assertEquals(gzipFileWriter.currentFile.rawBytes, 0);
Assert.assertEquals(gzipFileWriter.currentFile.path, FILE_PATH + ".gz");
Assert.assertTrue(gzipFileWriter.currentFile.file.canWrite());
gzipFileWriter.rollback();
}
@Test
public void testGzipFileWriter() throws IOException {
String path = Paths.get(currentDirectory.getPath(), "testGzipFileWriter").toString();
File folder = new File(path);
folder.mkdirs();
Assert.assertEquals(folder.listFiles().length, 0);
HashMap<String, Long> files = new HashMap<String, Long>();
final int MAX_FILE_SIZE = 128;
Consumer<GZIPFileDescriptor> trackFiles = (GZIPFileDescriptor f) -> {
files.put(f.path, f.rawBytes);
};
Supplier<String> generateFileName = () -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString();
GZIPFileWriter gzipFileWriter = new GZIPFileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName);
for (int i = 0; i < 9; i++) {
String msg = String.format("Line number %d : This is a message from the other size", i);
gzipFileWriter.write(msg.getBytes("UTF-8"));
}
Assert.assertEquals(files.size(), 4);
// should still have 1 open file at this point...
Assert.assertEquals(folder.listFiles().length, 1);
// close current file
gzipFileWriter.close();
Assert.assertEquals(files.size(), 5);
List<Long> sortedFiles = new ArrayList<Long>(files.values());
sortedFiles.sort((Long x, Long y) -> (int) (y - x));
Assert.assertEquals(sortedFiles, Arrays.asList((long) 106, (long) 106, (long) 106, (long) 106, (long) 53));
// make sure folder is clear once done
Assert.assertEquals(folder.listFiles().length, 0);
}
}

Просмотреть файл

@ -0,0 +1,300 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class KustoSinkTaskTest {
File currentDirectory;
@Before
public final void before() {
currentDirectory = new File(Paths.get(
System.getProperty("java.io.tmpdir"),
GZIPFileWriter.class.getName(),
String.valueOf(Instant.now().toEpochMilli())
).toString());
}
@After
public final void after() {
currentDirectory.delete();
}
@Test
public void testSinkTaskOpen() throws Exception {
HashMap<String, String> props = new HashMap<>();
props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_DB, "db1");
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "topic1:table1;topic2:table2;");
props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME, "test@test.com");
props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD, "123456!");
KustoSinkTask kustoSinkTask = new KustoSinkTask();
kustoSinkTask.start(props);
ArrayList<TopicPartition> tps = new ArrayList<>();
tps.add(new TopicPartition("topic1", 1));
tps.add(new TopicPartition("topic1", 2));
tps.add(new TopicPartition("topic2", 1));
kustoSinkTask.open(tps);
assertEquals(kustoSinkTask.writers.size(), 3);
}
@Test
public void testSinkTaskPutRecord() throws Exception {
HashMap<String, String> props = new HashMap<>();
props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_DB, "db1");
props.put(KustoSinkConfig.KUSTO_SINK_TEMPDIR, System.getProperty("java.io.tmpdir"));
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "topic1:table1;topic2:table2;");
props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME, "test@test.com");
props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD, "123456!");
KustoSinkTask kustoSinkTask = new KustoSinkTask();
kustoSinkTask.start(props);
ArrayList<TopicPartition> tps = new ArrayList<>();
TopicPartition tp = new TopicPartition("topic1", 1);
tps.add(tp);
kustoSinkTask.open(tps);
List<SinkRecord> records = new ArrayList<SinkRecord>();
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "stringy message".getBytes(StandardCharsets.UTF_8), 10));
kustoSinkTask.put(records);
assertEquals(kustoSinkTask.writers.get(tp).currentOffset, 10);
}
@Test
public void testSinkTaskPutRecordMissingPartition() throws Exception {
HashMap<String, String> props = new HashMap<>();
props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_DB, "db1");
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "topic1:table1;topic2:table2;");
props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME, "test@test.com");
props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD, "123456!");
KustoSinkTask kustoSinkTask = new KustoSinkTask();
kustoSinkTask.start(props);
ArrayList<TopicPartition> tps = new ArrayList<>();
tps.add(new TopicPartition("topic1", 1));
kustoSinkTask.open(tps);
List<SinkRecord> records = new ArrayList<SinkRecord>();
records.add(new SinkRecord("topic2", 1, null, null, null, "stringy message".getBytes(StandardCharsets.UTF_8), 10));
Throwable exception = assertThrows(ConnectException.class, () -> kustoSinkTask.put(records));
assertEquals(exception.getMessage(), "Received a record without a mapped writer for topic:partition(topic2:1), dropping record.");
}
@Test
public void getTopicsToTablesSingleValue() {
KustoSinkConfig mockedSinkConfig = mock(KustoSinkConfig.class);
when(mockedSinkConfig.getKustoTable()).thenReturn("table1");
Map<String, String> actual = KustoSinkTask.getTopicsToTables(mockedSinkConfig);
Assert.assertEquals(actual.size(), 1);
Assert.assertEquals(actual.get(KustoSinkTask.TOPICS_WILDCARD), "table1");
}
@Test
public void getTopicsToTablesActualMapping() {
KustoSinkConfig mockedSinkConfig = mock(KustoSinkConfig.class);
when(mockedSinkConfig.getKustoTopicToTableMapping()).thenReturn("topic1:table1;topic2:table2;");
Map<String, String> actual = KustoSinkTask.getTopicsToTables(mockedSinkConfig);
Assert.assertEquals(actual.size(), 2);
Assert.assertEquals(actual.get(KustoSinkTask.TOPICS_WILDCARD), null);
Assert.assertEquals(actual.get("topic1"), "table1");
Assert.assertEquals(actual.get("topic2"), "table2");
Assert.assertEquals(actual.get("topic3"), null);
}
@Test
public void sinkStartMissingUrlOrDbOrTables() {
HashMap<String, String> props = new HashMap<>();
KustoSinkTask kustoSinkTask = new KustoSinkTask();
{
Throwable exception = assertThrows(ConnectException.class, () -> {
kustoSinkTask.start(props);
});
assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Missing required configuration \"kusto.url\" which has no default value.");
}
props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net");
{
Throwable exception = assertThrows(ConnectException.class, () -> {
kustoSinkTask.start(props);
});
assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Missing required configuration \"kusto.db\" which has no default value.");
}
props.put(KustoSinkConfig.KUSTO_DB, "db1");
{
Throwable exception = assertThrows(ConnectException.class, () -> {
kustoSinkTask.start(props);
});
assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Kusto table mapping must be provided.");
}
props.put(KustoSinkConfig.KUSTO_TABLE, "table3");
{
Throwable exception = assertThrows(ConnectException.class, () -> {
kustoSinkTask.start(props);
});
assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Kusto authentication method must be provided.");
}
props.remove(KustoSinkConfig.KUSTO_TABLE);
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "topic1:table1;topic2:table2;");
{
Throwable exception = assertThrows(ConnectException.class, () -> {
kustoSinkTask.start(props);
});
assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Kusto authentication method must be provided.");
}
// check malformed table mapping throws properly
props.remove(KustoSinkConfig.KUSTO_TABLES_MAPPING);
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "topic1");
{
Throwable exception = assertThrows(ConnectException.class, () -> {
kustoSinkTask.start(props);
});
assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Provided table mapping is malformed. please make sure table mapping is of 'topicName:tableName;' format.");
}
}
@Test
public void sinkStartMissingAuth() {
HashMap<String, String> props = new HashMap<>();
props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_DB, "db1");
props.put(KustoSinkConfig.KUSTO_TABLE, "table3");
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "topic1:table1;topic2:table2;");
KustoSinkTask kustoSinkTask = new KustoSinkTask();
{
Throwable exception = assertThrows(ConnectException.class, () -> {
kustoSinkTask.start(props);
});
assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Kusto authentication method must be provided.");
}
props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME, "test@test.com");
{
Throwable exception = assertThrows(ConnectException.class, () -> {
kustoSinkTask.start(props);
});
assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Kusto authentication missing Password.");
}
props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD, "123456!");
{
// should not throw any errors
kustoSinkTask.start(props);
assertNotNull(kustoSinkTask.kustoIngestClient);
}
props.remove(KustoSinkConfig.KUSTO_AUTH_USERNAME);
props.remove(KustoSinkConfig.KUSTO_AUTH_PASSWORD);
props.put(KustoSinkConfig.KUSTO_AUTH_APPID, "appid");
{
Throwable exception = assertThrows(ConnectException.class, () -> {
kustoSinkTask.start(props);
});
assertEquals(exception.getMessage(), "Kusto Connector failed to start due to configuration error. Kusto authentication missing App Key.");
}
props.put(KustoSinkConfig.KUSTO_AUTH_APPKEY, "appkey");
{
// should not throw any errors
kustoSinkTask.start(props);
assertNotNull(kustoSinkTask.kustoIngestClient);
}
}
@Test
public void getTable() {
HashMap<String, String> props = new HashMap<>();
props.put(KustoSinkConfig.KUSTO_URL, "https://{cluster_name}.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_DB, "db1");
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING, "topic1:table1;topic2:table2;");
props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME, "test@test.com");
props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD, "123456!");
KustoSinkTask kustoSinkTask = new KustoSinkTask();
kustoSinkTask.start(props);
{
// single table mapping should cause all topics to be mapped to a single table
Assert.assertEquals(kustoSinkTask.getTable("topic1"), "table1");
Assert.assertEquals(kustoSinkTask.getTable("topic2"), "table2");
Assert.assertEquals(kustoSinkTask.getTable("topic3"), null);
}
// assert that single table takes precedence over mapping
props.put(KustoSinkConfig.KUSTO_TABLE, "table3");
{
kustoSinkTask.start(props);
Assert.assertEquals(kustoSinkTask.getTable("topic3"), "table3");
}
}
}

Просмотреть файл

@ -0,0 +1,210 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoIngestClient;
import com.microsoft.azure.kusto.kafka.connect.sink.client.KustoIngestionProperties;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.testng.Assert;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import static org.mockito.Mockito.*;
public class TopicPartitionWriterTest {
// TODO: should probably find a better way to mock internal class (GZIPFileWriter)...
File currentDirectory;
@Before
public final void before() {
currentDirectory = new File(Paths.get(
System.getProperty("java.io.tmpdir"),
GZIPFileWriter.class.getSimpleName(),
String.valueOf(Instant.now().toEpochMilli())
).toString());
}
@After
public final void after() {
try {
FileUtils.deleteDirectory(currentDirectory);
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void testHandleRollfile() {
TopicPartition tp = new TopicPartition("testPartition", 11);
KustoIngestClient mockedClient = mock(KustoIngestClient.class);
String db = "testdb1";
String table = "testtable1";
String basePath = "somepath";
long fileThreshold = 100;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, db, table, basePath, fileThreshold);
GZIPFileDescriptor descriptor = new GZIPFileDescriptor();
descriptor.rawBytes = 1024;
descriptor.path = "somepath/somefile";
writer.handleRollFile(descriptor);
KustoIngestionProperties kustoIngestionProperties = new KustoIngestionProperties(db, table, descriptor.rawBytes);
ArgumentCaptor<String> pathArgument = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<KustoIngestionProperties> ingestionPropertiesArgumentCaptor = ArgumentCaptor.forClass(KustoIngestionProperties.class);
try {
verify(mockedClient, only()).ingestFromSingleFile(pathArgument.capture(), ingestionPropertiesArgumentCaptor.capture());
} catch (Exception e) {
e.printStackTrace();
}
Assert.assertEquals(pathArgument.getValue(), descriptor.path);
Assert.assertEquals(table, ingestionPropertiesArgumentCaptor.getValue().getTableName());
Assert.assertEquals(db, ingestionPropertiesArgumentCaptor.getValue().getDatabaseName());
Assert.assertTrue(ingestionPropertiesArgumentCaptor.getValue().getFileSize().equals((long) 1024));
}
@Test
public void testGetFilename() {
TopicPartition tp = new TopicPartition("testTopic", 11);
KustoIngestClient mockClient = mock(KustoIngestClient.class);
String db = "testdb1";
String table = "testtable1";
String basePath = "somepath";
long fileThreshold = 100;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, db, table, basePath, fileThreshold);
Assert.assertEquals(writer.getFilePath(), Paths.get(basePath, "kafka_testTopic_11_0").toString());
}
@Test
public void testGetFilenameAfterOffsetChanges() {
TopicPartition tp = new TopicPartition("testTopic", 11);
KustoIngestClient mockClient = mock(KustoIngestClient.class);
String db = "testdb1";
String table = "testtable1";
String basePath = "somepath";
long fileThreshold = 100;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, db, table, basePath, fileThreshold);
writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "another,stringy,message", 3));
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}", 4));
for (SinkRecord record : records) {
writer.writeRecord(record);
}
Assert.assertEquals(writer.getFilePath(), Paths.get(basePath, "kafka_testTopic_11_5").toString());
}
@Test
public void testOpenClose() {
TopicPartition tp = new TopicPartition("testPartition", 1);
KustoIngestClient mockClient = mock(KustoIngestClient.class);
String db = "testdb1";
String table = "testtable1";
String basePath = "somepath";
long fileThreshold = 100;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, db, table, basePath, fileThreshold);
writer.open();
writer.close();
}
@Test
public void testWriteNonStringAndOffset() throws Exception {
// TopicPartition tp = new TopicPartition("testPartition", 11);
// KustoIngestClient mockClient = mock(KustoIngestClient.class);
// String db = "testdb1";
// String table = "testtable1";
// String basePath = "somepath";
// long fileThreshold = 100;
//
// TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, db, table, basePath, fileThreshold);
//
// List<SinkRecord> records = new ArrayList<SinkRecord>();
// DummyRecord dummyRecord1 = new DummyRecord(1, "a", (long) 2);
// DummyRecord dummyRecord2 = new DummyRecord(2, "b", (long) 4);
//
// records.add(new SinkRecord("topic", 1, null, null, null, dummyRecord1, 10));
// records.add(new SinkRecord("topic", 2, null, null, null, dummyRecord2, 3));
// records.add(new SinkRecord("topic", 2, null, null, null, dummyRecord2, 4));
//
// for (SinkRecord record : records) {
// writer.writeRecord(record);
// }
//
// Assert.assertEquals(writer.getFilePath(), "kafka_testPartition_11_0");
}
@Test
public void testWriteStringyValuesAndOffset() throws Exception {
TopicPartition tp = new TopicPartition("testTopic", 2);
KustoIngestClient mockClient = mock(KustoIngestClient.class);
String db = "testdb1";
String table = "testtable1";
String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
long fileThreshold = 100;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, db, table, basePath, fileThreshold);
writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "another,stringy,message", 3));
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}", 4));
for (SinkRecord record : records) {
writer.writeRecord(record);
}
Assert.assertEquals(writer.gzipFileWriter.currentFile.path, Paths.get(basePath, String.format("kafka_%s_%d_%d.gz", tp.topic(), tp.partition(), 3)).toString());
}
@Test
public void testWriteBytesValuesAndOffset() throws Exception {
TopicPartition tp = new TopicPartition("testPartition", 11);
KustoIngestClient mockClient = mock(KustoIngestClient.class);
String db = "testdb1";
String table = "testtable1";
String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
long fileThreshold = 50;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, db, table, basePath, fileThreshold);
writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "stringy message".getBytes(StandardCharsets.UTF_8), 10));
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "another,stringy,message".getBytes(StandardCharsets.UTF_8), 13));
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}".getBytes(StandardCharsets.UTF_8), 14));
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}".getBytes(StandardCharsets.UTF_8), 15));
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}".getBytes(StandardCharsets.UTF_8), 16));
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}".getBytes(StandardCharsets.UTF_8), 17));
for (SinkRecord record : records) {
writer.writeRecord(record);
}
//TODO : file threshold ignored?
Assert.assertTrue(writer.lastCommitedOffset.equals((long) 16));
Assert.assertEquals(writer.gzipFileWriter.currentFile.path, Paths.get(basePath, String.format("kafka_%s_%d_%d.gz", tp.topic(), tp.partition(), 17)).toString());
}
}