diff --git a/.gitignore b/.gitignore
index f4bb20c..9127f22 100644
--- a/.gitignore
+++ b/.gitignore
@@ -289,7 +289,7 @@ paket-files/
# JetBrains Rider
.idea/
-*.sln.iml
+*.iml
# CodeRush
.cr/
diff --git a/README.md b/README.md
index 0aa5526..4141ddc 100644
--- a/README.md
+++ b/README.md
@@ -57,7 +57,7 @@ Integration mode to Azure Data Explorer is batched, queued ingestion leveraging
### 3.3. Configurable retries
-- The connector supports retries for transient errors with the ability to provide parameters for the same
+- The connector supports retries for transient errors with the ability to provide relevant parameters
- and retries with exponential backoff
### 3.4. Serialization formats
@@ -193,7 +193,7 @@ KafkaClient {
**4. Configs to add to the Docker image:**
-This is covered in detail further on. It is specified here for the purpose of completenes of defining what goes onto the worker config.
+This is covered in detail further on. It is specified here for the purpose of completeness of defining what goes onto the worker config.
```
COPY krb5.conf /etc/krb5.conf
COPY hdi-esp-jaas.conf /etc/hdi-esp-jaas.conf
@@ -219,39 +219,40 @@ The following is complete set of connector sink properties-
| :--- | :--- | :--- | :--- |
| 1 | connector.class | Classname of the Kusto sink | Hard code to ``` com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector ```
*Required* |
| 2 | topics | Kafka topic specification | List of topics separated by commas
*Required* |
-| 3 | kusto.url | Kusto ingest cluster specification | Provide the ingest URI of your ADX cluster
Use the following construct for the private URL - https://ingest-private-[cluster].kusto.windows.net
*Required* |
-| 4 | aad.auth.authority | Credentials for Kusto | Provide the tenant ID of your Azure Active Directory
*Required* |
-| 5 | aad.auth.appid | Credentials for Kusto | Provide Azure Active Directory Service Principal Name
*Required* |
-| 6 | aad.auth.appkey | Credentials for Kusto | Provide Azure Active Directory Service Principal secret
*Required* |
-| 7 | kusto.tables.topics.mapping | Mapping of topics to tables | Provide 1..many topic-table comma-separated mappings as follows-
[{'topic': '\','db': '\', 'table': '\','format': '', 'mapping':'\'}]
*Required* |
-| 8 | key.converter | Deserialization | One of the below supported-
org.apache.kafka.connect.storage.StringConverter
org.apache.kafka.connect.json.JsonConverter
io.confluent.connect.avro.AvroConverter
io.confluent.connect.json.JsonSchemaConverter
org.apache.kafka.connect.converters.ByteArrayConverter
*Required* |
-| 9 | value.converter | Deserialization | One of the below supported-
org.apache.kafka.connect.storage.StringConverter
org.apache.kafka.connect.json.JsonConverter
io.confluent.connect.avro.AvroConverter
io.confluent.connect.json.JsonSchemaConverter
org.apache.kafka.connect.converters.ByteArrayConverter
*Required* |
-| 10 | value.converter.schema.registry.url | Schema validation | URI of the Kafka schema registry
*Optional* |
-| 11 | value.converter.schemas.enable | Schema validation | Set to true if you have embedded schema with payload but are not leveraging the schema registry
Applicable for avro and json
*Optional* |
-| 12 | tasks.max | connector parallelism | Specify the number of connector copy/sink tasks
*Required* |
-| 13 | flush.size.bytes | Performance knob for batching | Maximum bufer byte size per topic+partition combination that in combination with flush.interval.ms (whichever is reached first) should result in sinking to Kusto
*Default - 1 MB*
*Required* |
-| 14 | flush.interval.ms | Performance knob for batching | Minimum time interval per topic+partition combo that in combination with flush.size.bytes (whichever is reached first) should result in sinking to Kusto
*Default - 300 ms*
*Required* |
-| 15 | tempdir.path | Local directory path on Kafka Connect worker to buffer files to before shipping to Kusto | Default is value returned by ```System.getProperty("java.io.tmpdir")``` with a GUID attached to it
*Optional* |
-| 16 | behavior.on.error | Configurable behavior in response to errors encountered | Possible values - log, ignore, fail
log - log the error, send record to dead letter queue, and continue processing
ignore - log the error, send record to dead letter queue, proceed with processing despite errors encountered
fail - shut down connector task upon encountering
*Default - fail*
*Optional* |
-| 17 | errors.retry.max.time.ms | Configurable retries for transient errors | Period of time in milliseconds to retry for transient errors
*Default - 300 ms*
*Optional* |
-| 18 | errors.retry.backoff.time.ms | Configurable retries for transient errors | Period of time in milliseconds to backoff before retry for transient errors
*Default - 10 ms*
*Optional* |
-| 19 | errors.deadletterqueue.bootstrap.servers | Channel to write records that failed deserialization | CSV or kafkaBroker:port
*Optional* |
-| 20 | errors.deadletterqueue.topic.name | Channel to write records that failed deserialization | Pre-created topic name
*Optional* |
-| 21 | errors.deadletterqueue.security.protocol | Channel to write records that failed deserialization | Securitry protocol of secure Kafka cluster
*Optional but when feature is used with secure cluster, is required* |
-| 22 | errors.deadletterqueue.sasl.mechanism | Channel to write records that failed deserialization | SASL mechanism of secure Kafka cluster
*Optional but when feature is used with secure cluster, is required* |
-| 23 | errors.deadletterqueue.sasl.jaas.config | Channel to write records that failed deserialization | JAAS config of secure Kafka cluster
*Optional but when feature is used with secure cluster, is required* |
-| 24 | misc.deadletterqueue.bootstrap.servers | Channel to write records that due to reasons other than deserialization | CSV of kafkaBroker:port
*Optional* |
-| 25 | misc.deadletterqueue.topic.name | Channel to write records that due to reasons other than deserialization | Pre-created topic name
*Optional* |
-| 26 | misc.deadletterqueue.security.protocol | Channel to write records that due to reasons other than deserialization | Securitry protocol of secure Kafka cluster
*Optional but when feature is used with secure cluster, is required* |
-| 27 | misc.deadletterqueue.sasl.mechanism | Channel to write records that due to reasons other than deserialization | SASL mechanism of secure Kafka cluster
*Optional but when feature is used with secure cluster, is required* |
-| 28 | misc.deadletterqueue.sasl.jaas.config | Channel to write records that due to reasons other than deserialization | JAAS config of secure Kafka cluster
*Optional but when feature is used with secure cluster, is required* |
-| 29 | consumer.override.bootstrap.servers | Security details explicitly required for secure Kafka clusters | Bootstrap server:port CSV of secure Kafka cluster
*Required for secure Kafka clusters* |
-| 30 | consumer.override.security.protocol | Security details explicitly required for secure Kafka clusters | Security protocol of secure Kafka cluster
*Required for secure Kafka clusters* |
-| 31 | consumer.override.sasl.mechanism | Security details explicitly required for secure Kafka clusters | SASL mechanism of secure Kafka cluster
*Required for secure Kafka clusters* |
-| 32 | consumer.override.sasl.jaas.config | Security details explicitly required for secure Kafka clusters | JAAS config of secure Kafka cluster
*Required for secure Kafka clusters* |
-| 33 | consumer.override.sasl.kerberos.service.name | Security details explicitly required for secure Kafka clusters, specifically kerberized Kafka | Kerberos service name of kerberized Kafka cluster
*Required for kerberized Kafka clusters* |
-| 34 | consumer.override.auto.offset.reset | Configurable consuming from offset | Possible values are - earliest or latest
*Optional* |
-| 35 | consumer.override.max.poll.interval.ms| Config to prevent duplication | Set to a value to avoid consumer leaving the group while the Connector is retrying
*Optional* |
+| 3 | kusto.ingestion.url | Kusto ingestion endpoint URL | Provide the ingest URL of your ADX cluster
Use the following construct for the private URL - https://ingest-private-[cluster].kusto.windows.net
*Required* |
+| 4 | kusto.query.url | Kusto query endpoint URL | Provide the engine URL of your ADX cluster
*Optional* |
+| 5 | aad.auth.authority | Credentials for Kusto | Provide the tenant ID of your Azure Active Directory
*Required* |
+| 6 | aad.auth.appid | Credentials for Kusto | Provide Azure Active Directory Service Principal Name
*Required* |
+| 7 | aad.auth.appkey | Credentials for Kusto | Provide Azure Active Directory Service Principal secret
*Required* |
+| 8 | kusto.tables.topics.mapping | Mapping of topics to tables | Provide 1..many topic-table comma-separated mappings as follows-
[{'topic': '\','db': '\', 'table': '\','format': '', 'mapping':'\'}]
*Required* |
+| 9 | key.converter | Deserialization | One of the below supported-
org.apache.kafka.connect.storage.StringConverter
org.apache.kafka.connect.json.JsonConverter
io.confluent.connect.avro.AvroConverter
io.confluent.connect.json.JsonSchemaConverter
org.apache.kafka.connect.converters.ByteArrayConverter
*Required* |
+| 10 | value.converter | Deserialization | One of the below supported-
org.apache.kafka.connect.storage.StringConverter
org.apache.kafka.connect.json.JsonConverter
io.confluent.connect.avro.AvroConverter
io.confluent.connect.json.JsonSchemaConverter
org.apache.kafka.connect.converters.ByteArrayConverter
*Required* |
+| 11 | value.converter.schema.registry.url | Schema validation | URI of the Kafka schema registry
*Optional* |
+| 12 | value.converter.schemas.enable | Schema validation | Set to true if you have embedded schema with payload but are not leveraging the schema registry
Applicable for avro and json
*Optional* |
+| 13 | tasks.max | connector parallelism | Specify the number of connector copy/sink tasks
*Required* |
+| 14 | flush.size.bytes | Performance knob for batching | Maximum bufer byte size per topic+partition combination that in combination with flush.interval.ms (whichever is reached first) should result in sinking to Kusto
*Default - 1 MB*
*Required* |
+| 15 | flush.interval.ms | Performance knob for batching | Minimum time interval per topic+partition combo that in combination with flush.size.bytes (whichever is reached first) should result in sinking to Kusto
*Default - 300 ms*
*Required* |
+| 16 | tempdir.path | Local directory path on Kafka Connect worker to buffer files to before shipping to Kusto | Default is value returned by ```System.getProperty("java.io.tmpdir")``` with a GUID attached to it
*Optional* |
+| 17 | behavior.on.error | Configurable behavior in response to errors encountered | Possible values - log, ignore, fail
log - log the error, send record to dead letter queue, and continue processing
ignore - log the error, send record to dead letter queue, proceed with processing despite errors encountered
fail - shut down connector task upon encountering
*Default - fail*
*Optional* |
+| 18 | errors.retry.max.time.ms | Configurable retries for transient errors | Period of time in milliseconds to retry for transient errors
*Default - 300 ms*
*Optional* |
+| 19 | errors.retry.backoff.time.ms | Configurable retries for transient errors | Period of time in milliseconds to backoff before retry for transient errors
*Default - 10 ms*
*Optional* |
+| 20 | errors.deadletterqueue.bootstrap.servers | Channel to write records that failed deserialization | CSV or kafkaBroker:port
*Optional* |
+| 21 | errors.deadletterqueue.topic.name | Channel to write records that failed deserialization | Pre-created topic name
*Optional* |
+| 22 | errors.deadletterqueue.security.protocol | Channel to write records that failed deserialization | Securitry protocol of secure Kafka cluster
*Optional but when feature is used with secure cluster, is required* |
+| 23 | errors.deadletterqueue.sasl.mechanism | Channel to write records that failed deserialization | SASL mechanism of secure Kafka cluster
*Optional but when feature is used with secure cluster, is required* |
+| 24 | errors.deadletterqueue.sasl.jaas.config | Channel to write records that failed deserialization | JAAS config of secure Kafka cluster
*Optional but when feature is used with secure cluster, is required* |
+| 25 | misc.deadletterqueue.bootstrap.servers | Channel to write records that due to reasons other than deserialization | CSV of kafkaBroker:port
*Optional* |
+| 26 | misc.deadletterqueue.topic.name | Channel to write records that due to reasons other than deserialization | Pre-created topic name
*Optional* |
+| 27 | misc.deadletterqueue.security.protocol | Channel to write records that due to reasons other than deserialization | Securitry protocol of secure Kafka cluster
*Optional but when feature is used with secure cluster, is required* |
+| 28 | misc.deadletterqueue.sasl.mechanism | Channel to write records that due to reasons other than deserialization | SASL mechanism of secure Kafka cluster
*Optional but when feature is used with secure cluster, is required* |
+| 29 | misc.deadletterqueue.sasl.jaas.config | Channel to write records that due to reasons other than deserialization | JAAS config of secure Kafka cluster
*Optional but when feature is used with secure cluster, is required* |
+| 30 | consumer.override.bootstrap.servers | Security details explicitly required for secure Kafka clusters | Bootstrap server:port CSV of secure Kafka cluster
*Required for secure Kafka clusters* |
+| 31 | consumer.override.security.protocol | Security details explicitly required for secure Kafka clusters | Security protocol of secure Kafka cluster
*Required for secure Kafka clusters* |
+| 32 | consumer.override.sasl.mechanism | Security details explicitly required for secure Kafka clusters | SASL mechanism of secure Kafka cluster
*Required for secure Kafka clusters* |
+| 33 | consumer.override.sasl.jaas.config | Security details explicitly required for secure Kafka clusters | JAAS config of secure Kafka cluster
*Required for secure Kafka clusters* |
+| 34 | consumer.override.sasl.kerberos.service.name | Security details explicitly required for secure Kafka clusters, specifically kerberized Kafka | Kerberos service name of kerberized Kafka cluster
*Required for kerberized Kafka clusters* |
+| 35 | consumer.override.auto.offset.reset | Configurable consuming from offset | Possible values are - earliest or latest
*Optional* |
+| 36 | consumer.override.max.poll.interval.ms| Config to prevent duplication | Set to a value to avoid consumer leaving the group while the Connector is retrying
*Optional* |
@@ -272,12 +273,12 @@ The following is the roadmap-
Kafka Connect connectors can be deployed in standalone mode (just for development) or in distributed mode (production).
### 7.1. Standalone Kafka Connect deployment mode
-This involves having the connector plugin jar in /usr/share/java of a Kafka Connect worker, reference to the same plugin path in connnect-standalone.properties, and launching of the connector from command line. This is not scalable, not fault tolerant, and is not recommeded for production.
+This involves having the connector plugin jar in /usr/share/java of a Kafka Connect worker, reference to the same plugin path in connect-standalone.properties, and launching of the connector from command line. This is not scalable, not fault tolerant, and is not recommeded for production.
### 7.2. Distributed Kafka Connect deployment mode
Distributed Kafka Connect essentially involves creation of a KafkaConnect worker cluster as shown in the diagram below.
- Azure Kubernetes Service is a great infrastructure for the connect cluster, due to its managed and scalabale nature
-- Kubernetes is a great platform for the connect cluster, due to its scalabale nature and self-healing
+- Kubernetes is a great platform for the connect cluster, due to its scalable nature and self-healing
- Each orange polygon is a Kafka Connect worker and each green polygon is a sink connector instance
- A Kafka Connect worker can have 1..many task instances which helps with scale
- When a Kafka Connect worker is maxed out from a resource perspective (CPU, RAM), you can scale horizontally, add more Kafka Connect workers, ands tasks within them
diff --git a/config/connect-kusto-sink.properties b/config/connect-kusto-sink.properties
index 3705d65..1d4454e 100644
--- a/config/connect-kusto-sink.properties
+++ b/config/connect-kusto-sink.properties
@@ -3,7 +3,8 @@ connector.class=com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector
tasks.max=1
#topics=testing1,testing2
-#kusto.url=https://ingest-{cluster}.kusto.windows.net/
+#kusto.ingestion.url=https://ingest-{cluster}.kusto.windows.net/
+#kusto.query.url=https://{cluster}.kusto.windows.net/
#kusto.tables.topics.mapping=[{'topic': 'testing1','db': 'test_db', 'table': 'test_table_1','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'test_db', 'table': 'test_table_2','format': 'csv', 'mapping':'CsvMapping'}]
diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConfig.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConfig.java
index 33999f3..5ba38d9 100644
--- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConfig.java
+++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConfig.java
@@ -18,7 +18,6 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class KustoSinkConfig extends AbstractConfig {
-
private static final Logger log = LoggerFactory.getLogger(KustoSinkConfig.class);
private static final String DLQ_PROPS_PREFIX = "misc.deadletterqueue.";
@@ -38,9 +37,13 @@ public class KustoSinkConfig extends AbstractConfig {
}
// TODO: this might need to be per kusto cluster...
- static final String KUSTO_URL_CONF = "kusto.url";
- private static final String KUSTO_URL_DOC = "Kusto ingestion service URI.";
- private static final String KUSTO_URL_DISPLAY = "Kusto cluster ingestion URI";
+ static final String KUSTO_INGEST_URL_CONF = "kusto.ingestion.url";
+ private static final String KUSTO_INGEST_URL_DOC = "Kusto ingestion endpoint URL.";
+ private static final String KUSTO_INGEST_URL_DISPLAY = "Kusto cluster ingestion URL";
+
+ static final String KUSTO_ENGINE_URL_CONF = "kusto.query.url";
+ private static final String KUSTO_ENGINE_URL_DOC = "Kusto query endpoint URL.";
+ private static final String KUSTO_ENGINE_URL_DISPLAY = "Kusto cluster query URL";
static final String KUSTO_AUTH_APPID_CONF = "aad.auth.appid";
private static final String KUSTO_AUTH_APPID_DOC = "Application Id for Azure Active Directory authentication.";
@@ -93,7 +96,7 @@ public class KustoSinkConfig extends AbstractConfig {
static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF = "misc.deadletterqueue.bootstrap.servers";
private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DOC = "Configure this list to Kafka broker's address(es) "
+ "to which the Connector should write records failed due to restrictions while writing to the file in `tempdir.path`, network interruptions or unavailability of Kusto cluster. "
- + "This list should be in the form host-1:port-1,host-2:port-2,…host-n:port-n. ";
+ + "This list should be in the form host-1:port-1,host-2:port-2,…host-n:port-n.";
private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DISPLAY = "Miscellaneous Dead-Letter Queue Bootstrap Servers";
static final String KUSTO_DLQ_TOPIC_NAME_CONF = "misc.deadletterqueue.topic.name";
@@ -102,7 +105,7 @@ public class KustoSinkConfig extends AbstractConfig {
private static final String KUSTO_DLQ_TOPIC_NAME_DISPLAY = "Miscellaneous Dead-Letter Queue Topic Name";
static final String KUSTO_SINK_MAX_RETRY_TIME_MS_CONF = "errors.retry.max.time.ms";
- private static final String KUSTO_SINK_MAX_RETRY_TIME_MS_DOC = "Maximum time upto which the Connector "
+ private static final String KUSTO_SINK_MAX_RETRY_TIME_MS_DOC = "Maximum time up to which the Connector "
+ "should retry writing records to Kusto table in case of failures.";
private static final String KUSTO_SINK_MAX_RETRY_TIME_MS_DISPLAY = "Errors Maximum Retry Time";
@@ -120,18 +123,16 @@ public class KustoSinkConfig extends AbstractConfig {
}
public static ConfigDef getConfig() {
-
ConfigDef result = new ConfigDef();
defineConnectionConfigs(result);
defineWriteConfigs(result);
- defineErrorHandlingAndRetriesConfgis(result);
+ defineErrorHandlingAndRetriesConfigs(result);
return result;
}
- private static void defineErrorHandlingAndRetriesConfgis(ConfigDef result) {
-
+ private static void defineErrorHandlingAndRetriesConfigs(ConfigDef result) {
final String errorAndRetriesGroupName = "Error Handling and Retries";
int errorAndRetriesGroupOrder = 0;
@@ -193,7 +194,6 @@ public class KustoSinkConfig extends AbstractConfig {
}
private static void defineWriteConfigs(ConfigDef result) {
-
final String writeGroupName = "Writes";
int writeGroupOrder = 0;
@@ -243,21 +243,30 @@ public class KustoSinkConfig extends AbstractConfig {
}
private static void defineConnectionConfigs(ConfigDef result) {
-
final String connectionGroupName = "Connection";
int connectionGroupOrder = 0;
result
.define(
- KUSTO_URL_CONF,
+ KUSTO_INGEST_URL_CONF,
Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
Importance.HIGH,
- KUSTO_URL_DOC,
+ KUSTO_INGEST_URL_DOC,
connectionGroupName,
connectionGroupOrder++,
Width.MEDIUM,
- KUSTO_URL_DISPLAY)
+ KUSTO_INGEST_URL_DISPLAY)
+ .define(
+ KUSTO_ENGINE_URL_CONF,
+ Type.STRING,
+ ConfigDef.NO_DEFAULT_VALUE,
+ Importance.LOW,
+ KUSTO_ENGINE_URL_DOC,
+ connectionGroupName,
+ connectionGroupOrder++,
+ Width.MEDIUM,
+ KUSTO_ENGINE_URL_DISPLAY)
.define(
KUSTO_AUTH_APPKEY_CONF,
Type.PASSWORD,
@@ -291,7 +300,11 @@ public class KustoSinkConfig extends AbstractConfig {
}
public String getKustoUrl() {
- return this.getString(KUSTO_URL_CONF);
+ return this.getString(KUSTO_INGEST_URL_CONF);
+ }
+
+ public String getKustoEngineUrl() {
+ return this.getString(KUSTO_ENGINE_URL_CONF);
}
public String getAuthAppid() {
@@ -368,5 +381,4 @@ public class KustoSinkConfig extends AbstractConfig {
public static void main(String[] args) {
System.out.println(getConfig().toEnrichedRst());
}
-
}
diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java
index 785152a..ecbf6cb 100644
--- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java
+++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java
@@ -5,9 +5,9 @@ import com.microsoft.azure.kusto.data.*;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.ingest.IngestClient;
+import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
-import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -24,14 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Properties;
+import java.util.*;
/**
@@ -40,26 +33,36 @@ import java.util.Properties;
* Currently only ingested files are "committed" in the sense that we can advance the offset according to it.
*/
public class KustoSinkTask extends SinkTask {
-
+
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
-
- static final String FETCH_TABLE_QUERY = "%s|count";
- static final String FETCH_TABLE_MAPPING_QUERY = ".show table %s ingestion %s mapping '%s'";
- static final String FETCH_PRINCIPAL_ROLES_QUERY = ".show principal access with (principal = '%s', accesstype='ingest',database='%s',table='%s')";
- static final int INGESTION_ALLOWED_INDEX = 3;
-
+
+ public static final String FETCH_TABLE_QUERY = "%s | count";
+ public static final String FETCH_TABLE_MAPPING_QUERY = ".show table %s ingestion %s mapping '%s'";
+ public static final String FETCH_PRINCIPAL_ROLES_QUERY = ".show principal access with (principal = '%s', accesstype='ingest',database='%s',table='%s')";
+ public static final int INGESTION_ALLOWED_INDEX = 3;
+ public static final String MAPPING = "mapping";
+ public static final String MAPPING_FORMAT = "format";
+ public static final String MAPPING_TABLE = "table";
+ public static final String MAPPING_DB = "db";
+ public static final String JSON_FORMAT = "json";
+ public static final String SINGLEJSON_FORMAT = "singlejson";
+ public static final String MULTIJSON_FORMAT = "multijson";
+ public static final String VALIDATION_OK = "OK";
+
private final Set assignment;
private Map topicsToIngestionProps;
private KustoSinkConfig config;
- IngestClient kustoIngestClient;
- Map writers;
+ protected IngestClient kustoIngestClient;
+ protected Map writers;
private boolean isDlqEnabled;
private String dlqTopicName;
private Producer dlqProducer;
+ private static final ClientRequestProperties clientRequestProperties = new ClientRequestProperties();
public KustoSinkTask() {
assignment = new HashSet<>();
writers = new HashMap<>();
+ clientRequestProperties.setOption("validate_permissions", true);
}
public static IngestClient createKustoIngestClient(KustoSinkConfig config) {
@@ -90,7 +93,7 @@ public class KustoSinkTask extends SinkTask {
public static Client createKustoEngineClient(KustoSinkConfig config) {
try {
- String engineClientURL = config.getKustoUrl().replace("https://ingest-", "https://");
+ String engineClientURL = config.getKustoEngineUrl();
if (!Strings.isNullOrEmpty(config.getAuthAppid())) {
if (Strings.isNullOrEmpty(config.getAuthAppkey())) {
throw new ConfigException("Kusto authentication missing App Key.");
@@ -115,43 +118,39 @@ public class KustoSinkTask extends SinkTask {
}
public static Map getTopicsToIngestionProps(KustoSinkConfig config) {
- Map result = new HashMap<>();
+ Map result = new HashMap<>();
try {
-
JSONArray mappings = new JSONArray(config.getTopicToTableMapping());
-
- for (int i =0; i< mappings.length(); i++) {
-
+
+ for (int i = 0; i < mappings.length(); i++) {
JSONObject mapping = mappings.getJSONObject(i);
-
- String db = mapping.getString("db");
- String table = mapping.getString("table");
-
- String format = mapping.optString("format");
+
+ String db = mapping.getString(MAPPING_DB);
+ String table = mapping.getString(MAPPING_TABLE);
+
+ String format = mapping.optString(MAPPING_FORMAT);
IngestionProperties props = new IngestionProperties(db, table);
-
+
if (format != null && !format.isEmpty()) {
- if (format.equalsIgnoreCase("json") || format.equalsIgnoreCase("singlejson") || format.equalsIgnoreCase("multijson")) {
- props.setDataFormat("multijson");
+ if (format.equalsIgnoreCase(JSON_FORMAT) || format.equalsIgnoreCase(SINGLEJSON_FORMAT) || format.equalsIgnoreCase(MULTIJSON_FORMAT)) {
+ props.setDataFormat(MULTIJSON_FORMAT);
}
props.setDataFormat(format);
}
- String mappingRef = mapping.optString("mapping");
-
- if (mappingRef != null && !mappingRef.isEmpty()) {
- if (format != null) {
- if (format.equalsIgnoreCase("json") || format.equalsIgnoreCase("singlejson") || format.equalsIgnoreCase("multijson")) {
- props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Json);
- } else if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.avro.toString())){
- props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Avro);
- } else if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.apacheavro.toString())){
- props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.ApacheAvro);
- } else {
- props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Csv);
- }
+ String mappingRef = mapping.optString(MAPPING);
+
+ if (mappingRef != null && !mappingRef.isEmpty() && format != null) {
+ if (format.equalsIgnoreCase(JSON_FORMAT) || format.equalsIgnoreCase(SINGLEJSON_FORMAT) || format.equalsIgnoreCase(MULTIJSON_FORMAT)) {
+ props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Json);
+ } else if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.avro.toString())) {
+ props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Avro);
+ } else if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.apacheavro.toString())) {
+ props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.ApacheAvro);
+ } else {
+ props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Csv);
}
}
TopicIngestionProperties topicIngestionProperties = new TopicIngestionProperties();
@@ -159,8 +158,7 @@ public class KustoSinkTask extends SinkTask {
result.put(mapping.getString("topic"), topicIngestionProperties);
}
return result;
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
throw new ConfigException("Error while parsing kusto ingestion properties.", ex);
}
}
@@ -176,31 +174,27 @@ public class KustoSinkTask extends SinkTask {
Client engineClient = createKustoEngineClient(config);
if (config.getTopicToTableMapping() != null) {
JSONArray mappings = new JSONArray(config.getTopicToTableMapping());
- if(mappings.length() > 0) {
- if(isIngestorRole(mappings.getJSONObject(0), engineClient)) {
- for (int i = 0; i < mappings.length(); i++) {
- JSONObject mapping = mappings.getJSONObject(i);
- validateTableAccess(engineClient, mapping, config, databaseTableErrorList, accessErrorList);
- }
+ if ((mappings.length() > 0) && (isIngestorRole(mappings.getJSONObject(0), engineClient))) {
+ for (int i = 0; i < mappings.length(); i++) {
+ JSONObject mapping = mappings.getJSONObject(i);
+ validateTableAccess(engineClient, mapping, config, databaseTableErrorList, accessErrorList);
}
}
}
String tableAccessErrorMessage = "";
- if(!databaseTableErrorList.isEmpty())
- {
+ if (!databaseTableErrorList.isEmpty()) {
tableAccessErrorMessage = "\n\nError occurred while trying to access the following database:table\n" +
- String.join("\n",databaseTableErrorList);
+ String.join("\n", databaseTableErrorList);
}
- if(!accessErrorList.isEmpty())
- {
+ if (!accessErrorList.isEmpty()) {
tableAccessErrorMessage = tableAccessErrorMessage + "\n\nUser does not have appropriate permissions " +
"to sink data into the Kusto database:table combination(s). " +
"Verify your Kusto principals and roles before proceeding for the following: \n " +
- String.join("\n",accessErrorList);
+ String.join("\n", accessErrorList);
}
- if(!tableAccessErrorMessage.isEmpty()) {
+ if (!tableAccessErrorMessage.isEmpty()) {
throw new ConnectException(tableAccessErrorMessage);
}
} catch (JSONException e) {
@@ -209,12 +203,12 @@ public class KustoSinkTask extends SinkTask {
}
private boolean isIngestorRole(JSONObject testMapping, Client engineClient) throws JSONException {
- String database = testMapping.getString("db");
- String table = testMapping.getString("table");
+ String database = testMapping.getString(MAPPING_DB);
+ String table = testMapping.getString(MAPPING_TABLE);
try {
- KustoOperationResult rs = engineClient.execute(database, String.format(FETCH_TABLE_QUERY, table));
- } catch(DataServiceException | DataClientException err){
- if(err.getCause().getMessage().contains("Forbidden:")){
+ engineClient.execute(database, String.format(FETCH_TABLE_QUERY, table), clientRequestProperties);
+ } catch (DataServiceException | DataClientException err) {
+ if (err.getCause().getMessage().contains("Forbidden:")) {
log.warn("User might have ingestor privileges, table validation will be skipped for all table mappings ");
return false;
}
@@ -223,50 +217,44 @@ public class KustoSinkTask extends SinkTask {
}
/**
- * This function validates whether the user has the read and write access to the intended table
- * before starting to sink records into ADX.
+ * This function validates whether the user has the read and write access to the intended table
+ * before starting to sink records into ADX.
+ *
* @param engineClient Client connection to run queries.
- * @param mapping JSON Object containing a Table mapping.
- * @param config
+ * @param mapping JSON Object containing a Table mapping.
+ * @param config Kusto Sink configuration
*/
private static void validateTableAccess(Client engineClient, JSONObject mapping, KustoSinkConfig config, List databaseTableErrorList, List accessErrorList) throws JSONException {
-
- String database = mapping.getString("db");
- String table = mapping.getString("table");
- String format = mapping.getString("format");
- String mappingName = mapping.getString("mapping");
- if (format.equalsIgnoreCase("json") || format.equalsIgnoreCase("singlejson") || format.equalsIgnoreCase("multijson")) {
- format = "json";
+ String database = mapping.getString(MAPPING_DB);
+ String table = mapping.getString(MAPPING_TABLE);
+ String format = mapping.getString(MAPPING_FORMAT);
+ String mappingName = mapping.getString(MAPPING);
+ if (format.equalsIgnoreCase(JSON_FORMAT) || format.equalsIgnoreCase(SINGLEJSON_FORMAT) || format.equalsIgnoreCase(MULTIJSON_FORMAT)) {
+ format = JSON_FORMAT;
}
-
boolean hasAccess = false;
try {
try {
- KustoResultSetTable rs = engineClient.execute(database, String.format(FETCH_TABLE_QUERY, table)).getPrimaryResults();
- rs.next();
- if (rs.getLong(0) >= 0) {
+ KustoOperationResult rs = engineClient.execute(database, String.format(FETCH_TABLE_QUERY, table), clientRequestProperties);
+ if (VALIDATION_OK.equals(rs.getPrimaryResults().getData().get(0).get(0))) {
hasAccess = true;
}
-
} catch (DataServiceException e) {
databaseTableErrorList.add(String.format("Database:%s Table:%s | table not found", database, table));
}
- if(hasAccess) {
+ if (hasAccess) {
try {
- KustoOperationResult rp = engineClient.execute(database, String.format(FETCH_TABLE_MAPPING_QUERY, table, format, mappingName));
- if (rp.getPrimaryResults().getData().get(0).get(0).toString().equals(mappingName)) {
- hasAccess = true;
- }
+ engineClient.execute(database, String.format(FETCH_TABLE_MAPPING_QUERY, table, format, mappingName));
} catch (DataServiceException e) {
hasAccess = false;
databaseTableErrorList.add(String.format("Database:%s Table:%s | %s mapping '%s' not found", database, table, format, mappingName));
-
}
}
- if(hasAccess) {
+ if (hasAccess) {
+ String authenticateWith = "aadapp=" + config.getAuthAppid();
+ String query = String.format(FETCH_PRINCIPAL_ROLES_QUERY, authenticateWith, database, table);
try {
- String authenticateWith = "aadapp=" + config.getAuthAppid();
- KustoOperationResult rs = engineClient.execute(database, String.format(FETCH_PRINCIPAL_ROLES_QUERY, authenticateWith, database, table));
+ KustoOperationResult rs = engineClient.execute(database, query);
hasAccess = (boolean) rs.getPrimaryResults().getData().get(0).get(INGESTION_ALLOWED_INDEX);
if (hasAccess) {
log.info("User has appropriate permissions to sink data into the Kusto table={}", table);
@@ -277,12 +265,11 @@ public class KustoSinkTask extends SinkTask {
} catch (DataServiceException e) {
// Logging the error so that the trace is not lost.
if (!e.getCause().toString().contains("Forbidden")){
- log.error("{}", e);
+ log.error("Error fetching principal roles with query {}", query, e);
databaseTableErrorList.add(String.format("Database:%s Table:%s", database, table));
} else {
log.warn("Failed to check permissions, will continue the run as the principal might still be able to ingest: {}", e);
}
-
}
}
} catch (DataClientException e) {
@@ -296,11 +283,11 @@ public class KustoSinkTask extends SinkTask {
}
@Override
- public void open(Collection partitions) throws ConnectException {
+ public void open(Collection partitions) {
assignment.addAll(partitions);
for (TopicPartition tp : assignment) {
TopicIngestionProperties ingestionProps = getIngestionProps(tp.topic());
- log.debug(String.format("Open Kusto topic: '%s' with partition: '%s'", tp.topic(), tp.partition()));
+ log.debug("Open Kusto topic: '{}' with partition: '{}'", tp.topic(), tp.partition());
if (ingestionProps == null) {
throw new ConnectException(String.format("Kusto Sink has no ingestion props mapped " +
"for the topic: %s. please check your configuration.", tp.topic()));
@@ -320,17 +307,16 @@ public class KustoSinkTask extends SinkTask {
writers.remove(tp);
assignment.remove(tp);
} catch (ConnectException e) {
- log.error("Error closing writer for {}. Error: {}", tp, e);
+ log.error("Error closing writer for {}.", tp, e);
}
}
}
@Override
public void start(Map props) {
-
config = new KustoSinkConfig(props);
String url = config.getKustoUrl();
-
+
validateTableMappings(config);
if (config.isDlqEnabled()) {
isDlqEnabled = true;
@@ -348,16 +334,15 @@ public class KustoSinkTask extends SinkTask {
isDlqEnabled = false;
dlqTopicName = null;
}
-
+
topicsToIngestionProps = getTopicsToIngestionProps(config);
-
+
// this should be read properly from settings
kustoIngestClient = createKustoIngestClient(config);
-
- log.info(String.format("Started KustoSinkTask with target cluster: (%s), source topics: (%s)",
- url, topicsToIngestionProps.keySet().toString()));
+
+ log.info("Started KustoSinkTask with target cluster: ({}), source topics: ({})", url, topicsToIngestionProps.keySet());
// Adding this check to make code testable
- if(context!=null) {
+ if (context != null) {
open(context.assignment());
}
}
@@ -369,7 +354,7 @@ public class KustoSinkTask extends SinkTask {
writer.close();
}
try {
- if(kustoIngestClient != null) {
+ if (kustoIngestClient != null) {
kustoIngestClient.close();
}
} catch (IOException e) {
@@ -378,10 +363,10 @@ public class KustoSinkTask extends SinkTask {
}
@Override
- public void put(Collection records) throws ConnectException {
+ public void put(Collection records) {
SinkRecord lastRecord = null;
for (SinkRecord record : records) {
- log.debug("record to topic:" + record.topic());
+ log.debug("Record to topic: {}", record.topic());
lastRecord = record;
TopicPartition tp = new TopicPartition(record.topic(), record.kafkaPartition());
@@ -398,7 +383,7 @@ public class KustoSinkTask extends SinkTask {
}
if (lastRecord != null) {
- log.debug("Last record offset:" + lastRecord.kafkaOffset());
+ log.debug("Last record offset: {}", lastRecord.kafkaOffset());
}
}
@@ -410,7 +395,7 @@ public class KustoSinkTask extends SinkTask {
) {
Map offsetsToCommit = new HashMap<>();
for (TopicPartition tp : assignment) {
- if(writers.get(tp) == null) {
+ if (writers.get(tp) == null) {
throw new ConnectException("Topic Partition not configured properly. " +
"verify your `topics` and `kusto.tables.topics.mapping` configurations");
}
@@ -418,7 +403,7 @@ public class KustoSinkTask extends SinkTask {
Long lastCommittedOffset = writers.get(tp).lastCommittedOffset;
if (lastCommittedOffset != null) {
- Long offset = lastCommittedOffset + 1L;
+ long offset = lastCommittedOffset + 1L;
log.debug("Forwarding to framework request to commit offset: {} for {} while the offset is {}", offset, tp, offsets.get(tp));
offsetsToCommit.put(tp, new OffsetAndMetadata(offset));
}
@@ -430,6 +415,5 @@ public class KustoSinkTask extends SinkTask {
@Override
public void flush(Map offsets) {
// do nothing , rolling files can handle writing
-
}
-}
+}
\ No newline at end of file
diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java
index dd33fd6..0d8c570 100644
--- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java
+++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java
@@ -1,8 +1,8 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
-import com.microsoft.azure.kusto.data.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
+import com.microsoft.azure.kusto.data.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
@@ -25,23 +25,17 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.Properties;
-
+import java.util.*;
import java.util.logging.Logger;
public class E2ETest {
private static final String testPrefix = "tmpKafkaE2ETest";
- private String appId = System.getProperty("appId");
- private String appKey = System.getProperty("appKey");
- private String authority = System.getProperty("authority");
- private String cluster = System.getProperty("cluster");
- private String database = System.getProperty("database");
- private String tableBaseName = System.getProperty("table", testPrefix + UUID.randomUUID().toString().replace('-', '_'));
+ private static final String appId = System.getProperty("appId");
+ private static final String appKey = System.getProperty("appKey");
+ private static final String authority = System.getProperty("authority");
+ private static final String cluster = System.getProperty("cluster");
+ private static final String database = System.getProperty("database");
+ private static final String tableBaseName = System.getProperty("table", testPrefix + UUID.randomUUID().toString().replace('-', '_'));
private String basePath = Paths.get("src/test/resources/", "testE2E").toString();
private Logger log = Logger.getLogger(this.getClass().getName());
private boolean isDlqEnabled;
@@ -49,14 +43,14 @@ public class E2ETest {
private Producer kafkaProducer;
@Before
- public void setUp(){
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "localhost:9000");
- properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
- properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
- kafkaProducer = new KafkaProducer<>(properties);
- isDlqEnabled = false;
- dlqTopicName = null;
+ public void setUp() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", "localhost:9000");
+ properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ kafkaProducer = new KafkaProducer<>(properties);
+ isDlqEnabled = false;
+ dlqTopicName = null;
}
@Test
@@ -90,14 +84,15 @@ public class E2ETest {
props.ingestionProperties = ingestionProperties;
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
props.ingestionProperties.setIngestionMapping("mappy", IngestionMapping.IngestionMappingKind.Csv);
- String KustoUrl = String.format("https://ingest-%s.kusto.windows.net", cluster);
+ String kustoDmUrl = String.format("https://ingest-%s.kusto.windows.net", cluster);
+ String kustoEngineUrl = String.format("https://%s.kusto.windows.net", cluster);
String basepath = Paths.get(basePath, "csv").toString();
- Map settings = getKustoConfigs(KustoUrl, basepath, "mappy", fileThreshold, flushInterval);
- KustoSinkConfig config= new KustoSinkConfig(settings);
+ Map settings = getKustoConfigs(kustoDmUrl, kustoEngineUrl, basepath, "mappy", fileThreshold, flushInterval);
+ KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, ingestClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer);
writer.open();
- List records = new ArrayList();
+ List records = new ArrayList<>();
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.BYTES_SCHEMA, messages[0].getBytes(), 10));
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, messages[0].getBytes(), 10));
@@ -142,15 +137,16 @@ public class E2ETest {
props2.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.avro);
props2.ingestionProperties.setIngestionMapping("avroMapping", IngestionMapping.IngestionMappingKind.Avro);
TopicPartition tp2 = new TopicPartition("testPartition2", 11);
- String KustoUrl = String.format("https://ingest-%s.kusto.windows.net", cluster);
+ String kustoDmUrl = String.format("https://ingest-%s.kusto.windows.net", cluster);
+ String kustoEngineUrl = String.format("https://%s.kusto.windows.net", cluster);
String basepath = Paths.get(basePath, "avro").toString();
long fileThreshold = 100;
long flushInterval = 300000;
- Map settings = getKustoConfigs(KustoUrl, basepath, "avri", fileThreshold, flushInterval);
- KustoSinkConfig config= new KustoSinkConfig(settings);
+ Map settings = getKustoConfigs(kustoDmUrl, kustoEngineUrl, basepath, "avri", fileThreshold, flushInterval);
+ KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer2 = new TopicPartitionWriter(tp2, ingestClient, props2, config, isDlqEnabled, dlqTopicName, kafkaProducer);
writer2.open();
- List records2 = new ArrayList();
+ List records2 = new ArrayList<>();
FileInputStream fs = new FileInputStream("src/test/resources/data.avro");
byte[] buffer = new byte[1184];
@@ -177,10 +173,10 @@ public class E2ETest {
KustoResultSetTable res = engineClient.execute(database, query).getPrimaryResults();
res.next();
- Integer timeoutMs = 60 * 6 * 1000;
- Integer rowCount = res.getInt(0);
- Integer timeElapsedMs = 0;
- Integer sleepPeriodMs = 5 * 1000;
+ int timeoutMs = 60 * 6 * 1000;
+ int rowCount = res.getInt(0);
+ int timeElapsedMs = 0;
+ int sleepPeriodMs = 5 * 1000;
while (rowCount < expectedNumberOfRows && timeElapsedMs < timeoutMs) {
Thread.sleep(sleepPeriodMs);
@@ -190,13 +186,14 @@ public class E2ETest {
timeElapsedMs += sleepPeriodMs;
}
Assertions.assertEquals(rowCount, expectedNumberOfRows);
- this.log.info("Succesfully ingested " + expectedNumberOfRows + " records.");
+ this.log.info("Successfully ingested " + expectedNumberOfRows + " records.");
}
- private Map getKustoConfigs(String clusterUrl, String basePath,String tableMapping, long fileThreshold,
- long flushInterval) {
+ private Map getKustoConfigs(String clusterUrl, String engineUrl, String basePath, String tableMapping,
+ long fileThreshold, long flushInterval) {
Map settings = new HashMap<>();
- settings.put(KustoSinkConfig.KUSTO_URL_CONF, clusterUrl);
+ settings.put(KustoSinkConfig.KUSTO_INGEST_URL_CONF, clusterUrl);
+ settings.put(KustoSinkConfig.KUSTO_ENGINE_URL_CONF, engineUrl);
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, tableMapping);
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, appId);
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, appKey);
diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriterTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriterTest.java
index 02402fc..2649aaf 100644
--- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriterTest.java
+++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriterTest.java
@@ -8,32 +8,17 @@ import org.apache.commons.io.IOUtils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.junit.Assert;
-import java.io.File;
-import java.io.IOException;
-import java.io.ByteArrayOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.FileInputStream;
+import java.io.*;
import java.nio.file.Paths;
import java.time.Instant;
-import java.util.HashMap;
-import java.util.Objects;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.AbstractMap;
-
-
-
+import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
public class FileWriterTest {
private File currentDirectory;
@@ -68,12 +53,13 @@ public class FileWriterTest {
boolean mkdirs = folder.mkdirs();
Assert.assertTrue(mkdirs);
- Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
+ Assert.assertEquals(0, Objects.requireNonNull(folder.listFiles()).length);
final String FILE_PATH = Paths.get(path, "ABC").toString();
final int MAX_FILE_SIZE = 128;
- Consumer trackFiles = (SourceFile f) -> {};
+ Consumer trackFiles = (SourceFile f) -> {
+ };
Function generateFileName = (Long l) -> FILE_PATH;
@@ -82,8 +68,8 @@ public class FileWriterTest {
SinkRecord record = new SinkRecord("topic", 1, null, null, Schema.BYTES_SCHEMA, msg.getBytes(), 10);
fileWriter.initializeRecordWriter(record);
fileWriter.openFile(null);
- Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 1);
- Assert.assertEquals(fileWriter.currentFile.rawBytes, 0);
+ Assert.assertEquals(1, Objects.requireNonNull(folder.listFiles()).length);
+ Assert.assertEquals(0, fileWriter.currentFile.rawBytes);
Assert.assertEquals(fileWriter.currentFile.path, FILE_PATH);
Assert.assertTrue(fileWriter.currentFile.file.canWrite());
@@ -97,11 +83,9 @@ public class FileWriterTest {
File folder = new File(path);
boolean mkdirs = folder.mkdirs();
Assert.assertTrue(mkdirs);
-
- Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
+ Assert.assertEquals(0, Objects.requireNonNull(folder.listFiles()).length);
HashMap files = new HashMap<>();
-
final int MAX_FILE_SIZE = 100;
Consumer trackFiles = (SourceFile f) -> files.put(f.path, f.rawBytes);
@@ -116,21 +100,21 @@ public class FileWriterTest {
fileWriter.writeData(record1);
}
- Assert.assertEquals(files.size(), 4);
+ Assert.assertEquals(4, files.size());
// should still have 1 open file at this point...
- Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 1);
+ Assert.assertEquals(1, Objects.requireNonNull(folder.listFiles()).length);
// close current file
fileWriter.close();
- Assert.assertEquals(files.size(), 5);
+ Assert.assertEquals(5, files.size());
List sortedFiles = new ArrayList<>(files.values());
sortedFiles.sort((Long x, Long y) -> (int) (y - x));
Assert.assertEquals(sortedFiles, Arrays.asList((long) 108, (long) 108, (long) 108, (long) 108, (long) 54));
// make sure folder is clear once done
- Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
+ Assert.assertEquals(0, Objects.requireNonNull(folder.listFiles()).length);
}
@Test
@@ -157,9 +141,9 @@ public class FileWriterTest {
Thread.sleep(1000);
- Assert.assertEquals(files.size(), 0);
+ Assert.assertEquals(0, files.size());
fileWriter.close();
- Assert.assertEquals(files.size(), 1);
+ Assert.assertEquals(1, files.size());
String path2 = Paths.get(currentDirectory.getPath(), "testGzipFileWriter2_2").toString();
File folder2 = new File(path2);
@@ -175,7 +159,7 @@ public class FileWriterTest {
fileWriter2.writeData(record1);
Thread.sleep(1050);
- Assert.assertEquals(files.size(), 2);
+ Assert.assertEquals(2, files.size());
List sortedFiles = new ArrayList<>(files.values());
sortedFiles.sort((Long x, Long y) -> (int) (y - x));
@@ -183,7 +167,7 @@ public class FileWriterTest {
// make sure folder is clear once done
fileWriter2.close();
- Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
+ Assert.assertEquals(0, Objects.requireNonNull(folder.listFiles()).length);
}
@Test
@@ -212,7 +196,7 @@ public class FileWriterTest {
boolean mkdirs = folder.mkdirs();
Assert.assertTrue(mkdirs);
Function generateFileName = (Long offset) -> {
- if(offset == null){
+ if (offset == null) {
offset = offsets.currentOffset;
}
return Paths.get(path, Long.toString(offset)).toString();
@@ -247,16 +231,19 @@ public class FileWriterTest {
Thread.sleep(510);
// Assertions
- Assert.assertEquals(files.size(), 2);
+ Assert.assertEquals(2, files.size());
// Make sure that the first file is from offset 1 till 2 and second is from 3 till 3
- Assert.assertEquals(files.stream().map(Map.Entry::getValue).toArray(Long[]::new), new Long[]{30L, 15L});
- Assert.assertEquals(files.stream().map((s)->s.getKey().substring(path.length() + 1)).toArray(String[]::new), new String[]{"1", "3"});
- Assert.assertEquals(committedOffsets, new ArrayList(){{add(2L);add(3L);}});
+ Assert.assertEquals(new Long[]{30L, 15L}, files.stream().map(Map.Entry::getValue).toArray(Long[]::new));
+ Assert.assertEquals(new String[]{"1", "3"}, files.stream().map((s) -> s.getKey().substring(path.length() + 1)).toArray(String[]::new));
+ Assert.assertEquals(committedOffsets, new ArrayList() {{
+ add(2L);
+ add(3L);
+ }});
// make sure folder is clear once done
fileWriter2.close();
- Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
+ Assert.assertEquals(0, Objects.requireNonNull(folder.listFiles()).length);
}
static Function getAssertFileConsumerFunction(String msg) {
@@ -289,14 +276,16 @@ public class FileWriterTest {
}
protected Map getProperties() {
- Map settings = new HashMap<>();
- settings.put(KustoSinkConfig.KUSTO_URL_CONF, "xxx");
- settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
- settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
- settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
- settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
- return settings;
+ Map settings = new HashMap<>();
+ settings.put(KustoSinkConfig.KUSTO_INGEST_URL_CONF, "xxx");
+ settings.put(KustoSinkConfig.KUSTO_ENGINE_URL_CONF, "xxx");
+ settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
+ settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
+ settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
+ settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
+ return settings;
}
+
static Consumer getAssertFileConsumer(String msg) {
return (SourceFile f) -> {
try (FileInputStream fileInputStream = new FileInputStream(f.file)) {
@@ -324,4 +313,4 @@ public class FileWriterTest {
}
};
}
-}
+}
\ No newline at end of file
diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConnectorConfigTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConnectorConfigTest.java
index 6212cd4..c5cee71 100644
--- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConnectorConfigTest.java
+++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConnectorConfigTest.java
@@ -1,57 +1,35 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
-import org.apache.kafka.common.config.ConfigException;
-import org.junit.Before;
-import org.junit.Test;
-
import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.BehaviorOnError;
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.Map;
import java.util.Properties;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
public class KustoSinkConnectorConfigTest {
- Map settings;
- KustoSinkConfig config;
-
- @Before
- public void before() {
- settings = new HashMap<>();
- config = null;
- }
+ private static final String DM_URL = "https://ingest-cluster_name.kusto.windows.net";
+ private static final String ENGINE_URL = "https://cluster_name.kusto.windows.net";
@Test
public void shouldAcceptValidConfig() {
// Adding required Configuration with no default value.
- settings.put("kusto.tables.topics.mapping","[{'topic': 'xxx','db': 'xxx', 'table': 'xxx','format': 'avro', 'mapping':'avri'}]");
- settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
- settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
- settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
- settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
- settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
- config = new KustoSinkConfig(settings);
+ KustoSinkConfig config = new KustoSinkConfig(setupConfigs());
assertNotNull(config);
}
@Test
public void shouldHaveDefaultValues() {
// Adding required Configuration with no default value.
- settings.put("kusto.tables.topics.mapping","[{'topic': 'xxx','db': 'xxx', 'table': 'xxx','format': 'avro', 'mapping':'avri'}]");
- settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
- settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
- settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
- settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
- settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
- config = new KustoSinkConfig(settings);
+ KustoSinkConfig config = new KustoSinkConfig(setupConfigs());
assertNotNull(config.getKustoUrl());
- assertNotNull(config.getFlushSizeBytes());
- assertNotNull(config.getFlushInterval());
+ assertNotEquals(0, config.getFlushSizeBytes());
+ assertNotEquals(0, config.getFlushInterval());
assertFalse(config.isDlqEnabled());
assertEquals(BehaviorOnError.FAIL, config.getBehaviorOnError());
}
@@ -59,45 +37,53 @@ public class KustoSinkConnectorConfigTest {
@Test(expected = ConfigException.class)
public void shouldThrowExceptionWhenKustoURLNotGiven() {
// Adding required Configuration with no default value.
- settings.remove(KustoSinkConfig.KUSTO_URL_CONF);
- settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
- settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
- settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
- config = new KustoSinkConfig(settings);
+ HashMap settings = setupConfigs();
+ settings.remove(KustoSinkConfig.KUSTO_INGEST_URL_CONF);
+ new KustoSinkConfig(settings);
}
-
+
+ @Test
+ public void shouldUseDmUrlWhenKustoEngineUrlNotGivenAndCantGuess() {
+ HashMap settings = setupConfigs();
+ settings.put(KustoSinkConfig.KUSTO_INGEST_URL_CONF, ENGINE_URL);
+ KustoSinkConfig config = new KustoSinkConfig(settings);
+ String kustoEngineUrl = config.getKustoEngineUrl();
+ assertEquals(ENGINE_URL, kustoEngineUrl);
+ }
+
+ @Test
+ public void shouldUseKustoEngineUrlWhenGiven() {
+ HashMap settings = setupConfigs();
+ settings.put(KustoSinkConfig.KUSTO_ENGINE_URL_CONF, ENGINE_URL);
+ KustoSinkConfig config = new KustoSinkConfig(settings);
+ String kustoEngineUrl = config.getKustoEngineUrl();
+ assertEquals(ENGINE_URL, kustoEngineUrl);
+ }
+
@Test(expected = ConfigException.class)
public void shouldThrowExceptionWhenAppIdNotGiven() {
// Adding required Configuration with no default value.
- settings.remove(KustoSinkConfig.KUSTO_URL_CONF);
- settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
- settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
- config = new KustoSinkConfig(settings);
+ HashMap settings = setupConfigs();
+ settings.remove(KustoSinkConfig.KUSTO_AUTH_APPID_CONF);
+ new KustoSinkConfig(settings);
}
-
+
@Test(expected = ConfigException.class)
public void shouldFailWhenBehaviorOnErrorIsIllConfigured() {
// Adding required Configuration with no default value.
- settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
- settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
- settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
- settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
- settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
+ HashMap settings = setupConfigs();
+ settings.remove(KustoSinkConfig.KUSTO_INGEST_URL_CONF);
settings.put(KustoSinkConfig.KUSTO_BEHAVIOR_ON_ERROR_CONF, "DummyValue");
- config = new KustoSinkConfig(settings);
+ new KustoSinkConfig(settings);
}
-
+
@Test
public void verifyDlqSettings() {
- settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
- settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
- settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
- settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
- settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
+ HashMap settings = setupConfigs();
settings.put(KustoSinkConfig.KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF, "localhost:8081,localhost:8082");
settings.put(KustoSinkConfig.KUSTO_DLQ_TOPIC_NAME_CONF, "dlq-error-topic");
- config = new KustoSinkConfig(settings);
-
+ KustoSinkConfig config = new KustoSinkConfig(settings);
+
assertTrue(config.isDlqEnabled());
assertEquals(Arrays.asList("localhost:8081", "localhost:8082"), config.getDlqBootstrapServers());
assertEquals("dlq-error-topic", config.getDlqTopicName());
@@ -106,16 +92,11 @@ public class KustoSinkConnectorConfigTest {
@Test
public void shouldProcessDlqConfigsWithPrefix() {
// Adding required Configuration with no default value.
- settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
- settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
- settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
- settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
- settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
-
+ HashMap settings = setupConfigs();
settings.put("misc.deadletterqueue.security.protocol", "SASL_PLAINTEXT");
settings.put("misc.deadletterqueue.sasl.mechanism", "PLAIN");
- config = new KustoSinkConfig(settings);
+ KustoSinkConfig config = new KustoSinkConfig(settings);
assertNotNull(config);
@@ -125,4 +106,14 @@ public class KustoSinkConnectorConfigTest {
assertEquals("PLAIN", dlqProps.get("sasl.mechanism"));
}
+ public static HashMap setupConfigs() {
+ HashMap configs = new HashMap<>();
+ configs.put(KustoSinkConfig.KUSTO_INGEST_URL_CONF, DM_URL);
+ configs.put(KustoSinkConfig.KUSTO_ENGINE_URL_CONF, ENGINE_URL);
+ configs.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db2', 'table': 'table2','format': 'json','mapping': 'Mapping'}]");
+ configs.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
+ configs.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
+ configs.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
+ return configs;
+ }
}
diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTaskTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTaskTest.java
index 6befb99..ee68b7b 100644
--- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTaskTest.java
+++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTaskTest.java
@@ -17,11 +17,11 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
-
public class KustoSinkTaskTest {
File currentDirectory;
@@ -40,19 +40,12 @@ public class KustoSinkTaskTest {
}
@Test
- public void testSinkTaskOpen() throws Exception {
- HashMap props = new HashMap<>();
- props.put(KustoSinkConfig.KUSTO_URL_CONF, "https://cluster_name.kusto.windows.net");
-
- props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db1', 'table': 'table1','format': 'json','mapping': 'Mapping'}]");
- props.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
- props.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
- props.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
-
+ public void testSinkTaskOpen() {
+ HashMap configs = KustoSinkConnectorConfigTest.setupConfigs();
KustoSinkTask kustoSinkTask = new KustoSinkTask();
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
doNothing().when(kustoSinkTaskSpy).validateTableMappings(Mockito.any());
- kustoSinkTaskSpy.start(props);
+ kustoSinkTaskSpy.start(configs);
ArrayList tps = new ArrayList<>();
tps.add(new TopicPartition("topic1", 1));
tps.add(new TopicPartition("topic1", 2));
@@ -60,23 +53,16 @@ public class KustoSinkTaskTest {
kustoSinkTaskSpy.open(tps);
- assertEquals(kustoSinkTaskSpy.writers.size(), 3);
+ assertEquals(3, kustoSinkTaskSpy.writers.size());
}
@Test
- public void testSinkTaskPutRecord() throws Exception {
- HashMap props = new HashMap<>();
- props.put(KustoSinkConfig.KUSTO_URL_CONF, "https://cluster_name.kusto.windows.net");
- props.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, System.getProperty("java.io.tmpdir"));
- props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'testing1','db': 'db1', 'table': 'table1','format': 'json','mapping': 'Mapping'}]");
- props.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
- props.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
- props.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
-
+ public void testSinkTaskPutRecord() {
+ HashMap configs = KustoSinkConnectorConfigTest.setupConfigs();
KustoSinkTask kustoSinkTask = new KustoSinkTask();
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
doNothing().when(kustoSinkTaskSpy).validateTableMappings(Mockito.any());
- kustoSinkTaskSpy.start(props);
+ kustoSinkTaskSpy.start(configs);
ArrayList tps = new ArrayList<>();
TopicPartition tp = new TopicPartition("topic1", 1);
@@ -84,68 +70,55 @@ public class KustoSinkTaskTest {
kustoSinkTaskSpy.open(tps);
- List records = new ArrayList();
+ List records = new ArrayList<>();
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "stringy message".getBytes(StandardCharsets.UTF_8), 10));
kustoSinkTaskSpy.put(records);
- assertEquals(kustoSinkTaskSpy.writers.get(tp).currentOffset, 10);
+ assertEquals(10, kustoSinkTaskSpy.writers.get(tp).currentOffset);
}
@Test
- public void testSinkTaskPutRecordMissingPartition() throws Exception {
- HashMap props = new HashMap<>();
- props.put(KustoSinkConfig.KUSTO_URL_CONF, "https://cluster_name.kusto.windows.net");
- props.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, System.getProperty("java.io.tmpdir"));
- props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db1', 'table': 'table1','format': 'json','mapping': 'Mapping'}]");
- props.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
- props.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
- props.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
-
+ public void testSinkTaskPutRecordMissingPartition() {
+ HashMap configs = KustoSinkConnectorConfigTest.setupConfigs();
+ configs.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, System.getProperty("java.io.tmpdir"));
KustoSinkTask kustoSinkTask = new KustoSinkTask();
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
doNothing().when(kustoSinkTaskSpy).validateTableMappings(Mockito.any());
- kustoSinkTaskSpy.start(props);
+ kustoSinkTaskSpy.start(configs);
ArrayList tps = new ArrayList<>();
tps.add(new TopicPartition("topic1", 1));
kustoSinkTaskSpy.open(tps);
- List records = new ArrayList();
+ List records = new ArrayList<>();
records.add(new SinkRecord("topic2", 1, null, null, null, "stringy message".getBytes(StandardCharsets.UTF_8), 10));
Throwable exception = assertThrows(ConnectException.class, () -> kustoSinkTaskSpy.put(records));
- assertEquals(exception.getMessage(), "Received a record without a mapped writer for topic:partition(topic2:1), dropping record.");
-
+ assertEquals("Received a record without a mapped writer for topic:partition(topic2:1), dropping record.", exception.getMessage());
}
@Test
public void getTable() {
- HashMap props = new HashMap<>();
- props.put(KustoSinkConfig.KUSTO_URL_CONF, "https://cluster_name.kusto.windows.net");
- props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db2', 'table': 'table2','format': 'json','mapping': 'Mapping'}]");
- props.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
- props.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
- props.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
-
+ HashMap configs = KustoSinkConnectorConfigTest.setupConfigs();
KustoSinkTask kustoSinkTask = new KustoSinkTask();
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
doNothing().when(kustoSinkTaskSpy).validateTableMappings(Mockito.any());
- kustoSinkTaskSpy.start(props);
+ kustoSinkTaskSpy.start(configs);
{
// single table mapping should cause all topics to be mapped to a single table
- Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic1").ingestionProperties.getDatabaseName(), "db1");
- Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic1").ingestionProperties.getTableName(), "table1");
- Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic1").ingestionProperties.getDataFormat(), "csv");
- Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getDatabaseName(), "db2");
- Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getTableName(), "table2");
- Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getDataFormat(), "json");
- Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getIngestionMapping().getIngestionMappingReference(), "Mapping");
+ Assert.assertEquals("db1", kustoSinkTaskSpy.getIngestionProps("topic1").ingestionProperties.getDatabaseName());
+ Assert.assertEquals("table1", kustoSinkTaskSpy.getIngestionProps("topic1").ingestionProperties.getTableName());
+ Assert.assertEquals("csv", kustoSinkTaskSpy.getIngestionProps("topic1").ingestionProperties.getDataFormat());
+ Assert.assertEquals("db2", kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getDatabaseName());
+ Assert.assertEquals("table2", kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getTableName());
+ Assert.assertEquals("json", kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getDataFormat());
+ Assert.assertEquals("Mapping", kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getIngestionMapping().getIngestionMappingReference());
Assert.assertNull(kustoSinkTaskSpy.getIngestionProps("topic3"));
}
}
-}
+}
\ No newline at end of file
diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriterTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriterTest.java
index 9b8f8a8..b0ec5ea 100644
--- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriterTest.java
+++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriterTest.java
@@ -11,10 +11,10 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-import org.junit.Assert;
import java.io.ByteArrayOutputStream;
import java.io.File;
@@ -22,18 +22,15 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Instant;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
import static org.mockito.Mockito.*;
public class TopicPartitionWriterTest {
// TODO: should probably find a better way to mock internal class (FileWriter)...
private File currentDirectory;
- private static final String KUSTO_CLUSTER_URL = "https://ingest-cluster.kusto.windows.net";
+ private static final String KUSTO_INGEST_CLUSTER_URL = "https://ingest-cluster.kusto.windows.net";
+ private static final String KUSTO_CLUSTER_URL = "https://cluster.kusto.windows.net";
private static final String DATABASE = "testdb1";
private static final String TABLE = "testtable1";
private boolean isDlqEnabled;
@@ -76,13 +73,13 @@ public class TopicPartitionWriterTest {
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = ingestionProperties;
Map settings = getKustoConfigs(basePath, fileThreshold, flushInterval);
- KustoSinkConfig config= new KustoSinkConfig(settings);
+ KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer);
SourceFile descriptor = new SourceFile();
descriptor.rawBytes = 1024;
descriptor.path = "somepath/somefile";
- descriptor.file = new File ("C://myfile.txt");
+ descriptor.file = new File("C://myfile.txt");
writer.handleRollFile(descriptor);
ArgumentCaptor fileSourceInfoArgument = ArgumentCaptor.forClass(FileSourceInfo.class);
@@ -96,7 +93,7 @@ public class TopicPartitionWriterTest {
Assert.assertEquals(fileSourceInfoArgument.getValue().getFilePath(), descriptor.path);
Assert.assertEquals(TABLE, ingestionPropertiesArgumentCaptor.getValue().getTableName());
Assert.assertEquals(DATABASE, ingestionPropertiesArgumentCaptor.getValue().getDatabaseName());
- Assert.assertEquals(fileSourceInfoArgument.getValue().getRawSizeInBytes(), 1024);
+ Assert.assertEquals(1024, fileSourceInfoArgument.getValue().getRawSizeInBytes());
}
@Test
@@ -111,7 +108,7 @@ public class TopicPartitionWriterTest {
props.ingestionProperties = new IngestionProperties(DATABASE, TABLE);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
Map settings = getKustoConfigs(basePath, fileThreshold, flushInterval);
- KustoSinkConfig config= new KustoSinkConfig(settings);
+ KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer);
Assert.assertEquals(writer.getFilePath(null), Paths.get(config.getTempDirPath(), "kafka_testTopic_11_0.csv.gz").toString());
@@ -128,7 +125,7 @@ public class TopicPartitionWriterTest {
props.ingestionProperties = new IngestionProperties(DATABASE, TABLE);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
Map settings = getKustoConfigs(basePath, fileThreshold, flushInterval);
- KustoSinkConfig config= new KustoSinkConfig(settings);
+ KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer);
writer.open();
List records = new ArrayList<>();
@@ -155,7 +152,7 @@ public class TopicPartitionWriterTest {
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
Map settings = getKustoConfigs(basePath, fileThreshold, flushInterval);
- KustoSinkConfig config= new KustoSinkConfig(settings);
+ KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer);
writer.open();
writer.close();
@@ -188,7 +185,7 @@ public class TopicPartitionWriterTest {
}
@Test
- public void testWriteStringyValuesAndOffset() throws Exception {
+ public void testWriteStringyValuesAndOffset() {
TopicPartition tp = new TopicPartition("testTopic", 2);
IngestClient mockClient = mock(IngestClient.class);
String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
@@ -199,11 +196,11 @@ public class TopicPartitionWriterTest {
props.ingestionProperties = new IngestionProperties(DATABASE, TABLE);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
Map settings = getKustoConfigs(basePath, fileThreshold, flushInterval);
- KustoSinkConfig config= new KustoSinkConfig(settings);
+ KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer);
writer.open();
- List records = new ArrayList();
+ List records = new ArrayList<>();
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.STRING_SCHEMA, "another,stringy,message", 3));
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.STRING_SCHEMA, "{'also':'stringy','sortof':'message'}", 4));
@@ -221,7 +218,7 @@ public class TopicPartitionWriterTest {
TopicPartition tp = new TopicPartition("testPartition", 11);
IngestClient mockClient = mock(IngestClient.class);
String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
- String[] messages = new String[]{ "stringy message", "another,stringy,message", "{'also':'stringy','sortof':'message'}"};
+ String[] messages = new String[]{"stringy message", "another,stringy,message", "{'also':'stringy','sortof':'message'}"};
// Expect to finish file after writing forth message cause of fileThreshold
long fileThreshold = messages[0].length() + messages[1].length() + messages[2].length() + messages[2].length() - 1;
@@ -230,11 +227,11 @@ public class TopicPartitionWriterTest {
props.ingestionProperties = new IngestionProperties(DATABASE, TABLE);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
Map settings = getKustoConfigs(basePath, fileThreshold, flushInterval);
- KustoSinkConfig config= new KustoSinkConfig(settings);
+ KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer);
writer.open();
- List records = new ArrayList();
+ List records = new ArrayList<>();
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.STRING_SCHEMA, messages[0], 10));
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.STRING_SCHEMA, messages[1], 13));
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.STRING_SCHEMA, messages[2], 14));
@@ -245,8 +242,8 @@ public class TopicPartitionWriterTest {
writer.writeRecord(record);
}
- Assert.assertEquals((long) writer.lastCommittedOffset, (long) 15);
- Assert.assertEquals(writer.currentOffset, 16);
+ Assert.assertEquals(15, (long) writer.lastCommittedOffset);
+ Assert.assertEquals(16, writer.currentOffset);
String currentFileName = writer.fileWriter.currentFile.path;
Assert.assertEquals(currentFileName, Paths.get(config.getTempDirPath(), String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 15, IngestionProperties.DATA_FORMAT.csv.name())).toString());
@@ -277,19 +274,19 @@ public class TopicPartitionWriterTest {
props.ingestionProperties = new IngestionProperties(DATABASE, TABLE);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.avro);
Map settings = getKustoConfigs(basePath, fileThreshold, flushInterval);
- KustoSinkConfig config= new KustoSinkConfig(settings);
+ KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer);
writer.open();
- List records = new ArrayList();
+ List records = new ArrayList<>();
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.BYTES_SCHEMA, o.toByteArray(), 10));
for (SinkRecord record : records) {
writer.writeRecord(record);
}
- Assert.assertEquals((long) writer.lastCommittedOffset, (long) 10);
- Assert.assertEquals(writer.currentOffset, 10);
+ Assert.assertEquals(10, (long) writer.lastCommittedOffset);
+ Assert.assertEquals(10, writer.currentOffset);
String currentFileName = writer.fileWriter.currentFile.path;
@@ -300,7 +297,8 @@ public class TopicPartitionWriterTest {
private Map getKustoConfigs(String basePath, long fileThreshold,
long flushInterval) {
Map settings = new HashMap<>();
- settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL);
+ settings.put(KustoSinkConfig.KUSTO_INGEST_URL_CONF, KUSTO_INGEST_CLUSTER_URL);
+ settings.put(KustoSinkConfig.KUSTO_ENGINE_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");