Phase 1: Add kusto.engine.url as an optional param, which is used to verify table mappings and access.
This commit is contained in:
Родитель
cb16235aaa
Коммит
aeccd9b7e1
67
README.md
67
README.md
|
@ -219,39 +219,40 @@ The following is complete set of connector sink properties-
|
|||
| :--- | :--- | :--- | :--- |
|
||||
| 1 | connector.class | Classname of the Kusto sink | Hard code to ``` com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector ```<br>*Required* |
|
||||
| 2 | topics | Kafka topic specification | List of topics separated by commas<br>*Required* |
|
||||
| 3 | kusto.url | Kusto ingest cluster specification | Provide the ingest URI of your ADX cluster<br>*Required* |
|
||||
| 4 | aad.auth.authority | Credentials for Kusto | Provide the tenant ID of your Azure Active Directory<br>*Required* |
|
||||
| 5 | aad.auth.appid | Credentials for Kusto | Provide Azure Active Directory Service Principal Name<br>*Required* |
|
||||
| 6 | aad.auth.appkey | Credentials for Kusto | Provide Azure Active Directory Service Principal secret<br>*Required* |
|
||||
| 7 | kusto.tables.topics.mapping | Mapping of topics to tables | Provide 1..many topic-table comma-separated mappings as follows-<br>[{'topic': '\<topicName1\>','db': '\<datebaseName\>', 'table': '\<tableName\>','format': '<format-e.g.avro/csv/json>', 'mapping':'\<tableMappingName\>'}]<br>*Required* |
|
||||
| 8 | key.converter | Deserialization | One of the below supported-<br>org.apache.kafka.connect.storage.StringConverter<br> org.apache.kafka.connect.json.JsonConverter<br>io.confluent.connect.avro.AvroConverter<br>io.confluent.connect.json.JsonSchemaConverter<br> org.apache.kafka.connect.converters.ByteArrayConverter<br><br>*Required* |
|
||||
| 9 | value.converter | Deserialization | One of the below supported-<br>org.apache.kafka.connect.storage.StringConverter<br> org.apache.kafka.connect.json.JsonConverter<br>io.confluent.connect.avro.AvroConverter<br>io.confluent.connect.json.JsonSchemaConverter<br> org.apache.kafka.connect.converters.ByteArrayConverter<br><br>*Required* |
|
||||
| 10 | value.converter.schema.registry.url | Schema validation | URI of the Kafka schema registry<br>*Optional* |
|
||||
| 11 | value.converter.schemas.enable | Schema validation | Set to true if you have embedded schema with payload but are not leveraging the schema registry<br>Applicable for avro and json<br><br>*Optional* |
|
||||
| 12 | tasks.max | connector parallelism | Specify the number of connector copy/sink tasks<br>*Required* |
|
||||
| 13 | flush.size.bytes | Performance knob for batching | Maximum bufer byte size per topic+partition combination that in combination with flush.interval.ms (whichever is reached first) should result in sinking to Kusto<br>*Default - 1 MB*<br>*Required* |
|
||||
| 14 | flush.interval.ms | Performance knob for batching | Minimum time interval per topic+partition combo that in combination with flush.size.bytes (whichever is reached first) should result in sinking to Kusto<br>*Default - 300 ms*<br>*Required* |
|
||||
| 15 | tempdir.path | Local directory path on Kafka Connect worker to buffer files to before shipping to Kusto | Default is value returned by ```System.getProperty("java.io.tmpdir")``` with a GUID attached to it<br><br>*Optional* |
|
||||
| 16 | behavior.on.error | Configurable behavior in response to errors encountered | Possible values - log, ignore, fail<br><br>log - log the error, send record to dead letter queue, and continue processing<br>ignore - log the error, send record to dead letter queue, proceed with processing despite errors encountered<br>fail - shut down connector task upon encountering<br><br>*Default - fail*<br>*Optional* |
|
||||
| 17 | errors.retry.max.time.ms | Configurable retries for transient errors | Period of time in milliseconds to retry for transient errors<br><br>*Default - 300 ms*<br>*Optional* |
|
||||
| 18 | errors.retry.backoff.time.ms | Configurable retries for transient errors | Period of time in milliseconds to backoff before retry for transient errors<br><br>*Default - 10 ms*<br>*Optional* |
|
||||
| 19 | errors.deadletterqueue.bootstrap.servers | Channel to write records that failed deserialization | CSV or kafkaBroker:port <br>*Optional* |
|
||||
| 20 | errors.deadletterqueue.topic.name | Channel to write records that failed deserialization | Pre-created topic name <br>*Optional* |
|
||||
| 21 | errors.deadletterqueue.security.protocol | Channel to write records that failed deserialization | Securitry protocol of secure Kafka cluster <br>*Optional but when feature is used with secure cluster, is required* |
|
||||
| 22 | errors.deadletterqueue.sasl.mechanism | Channel to write records that failed deserialization | SASL mechanism of secure Kafka cluster<br>*Optional but when feature is used with secure cluster, is required* |
|
||||
| 23 | errors.deadletterqueue.sasl.jaas.config | Channel to write records that failed deserialization | JAAS config of secure Kafka cluster<br>*Optional but when feature is used with secure cluster, is required* |
|
||||
| 24 | misc.deadletterqueue.bootstrap.servers | Channel to write records that due to reasons other than deserialization | CSV of kafkaBroker:port <br>*Optional* |
|
||||
| 25 | misc.deadletterqueue.topic.name | Channel to write records that due to reasons other than deserialization | Pre-created topic name <br>*Optional* |
|
||||
| 26 | misc.deadletterqueue.security.protocol | Channel to write records that due to reasons other than deserialization | Securitry protocol of secure Kafka cluster <br>*Optional but when feature is used with secure cluster, is required* |
|
||||
| 27 | misc.deadletterqueue.sasl.mechanism | Channel to write records that due to reasons other than deserialization | SASL mechanism of secure Kafka cluster<br>*Optional but when feature is used with secure cluster, is required* |
|
||||
| 28 | misc.deadletterqueue.sasl.jaas.config | Channel to write records that due to reasons other than deserialization | JAAS config of secure Kafka cluster<br>*Optional but when feature is used with secure cluster, is required* |
|
||||
| 29 | consumer.override.bootstrap.servers | Security details explicitly required for secure Kafka clusters | Bootstrap server:port CSV of secure Kafka cluster <br>*Required for secure Kafka clusters* |
|
||||
| 30 | consumer.override.security.protocol | Security details explicitly required for secure Kafka clusters | Security protocol of secure Kafka cluster <br>*Required for secure Kafka clusters* |
|
||||
| 31 | consumer.override.sasl.mechanism | Security details explicitly required for secure Kafka clusters | SASL mechanism of secure Kafka cluster<br>*Required for secure Kafka clusters* |
|
||||
| 32 | consumer.override.sasl.jaas.config | Security details explicitly required for secure Kafka clusters | JAAS config of secure Kafka cluster<br>*Required for secure Kafka clusters* |
|
||||
| 33 | consumer.override.sasl.kerberos.service.name | Security details explicitly required for secure Kafka clusters, specifically kerberized Kafka | Kerberos service name of kerberized Kafka cluster<br>*Required for kerberized Kafka clusters* |
|
||||
| 34 | consumer.override.auto.offset.reset | Configurable consuming from offset | Possible values are - earliest or latest<br>*Optional* |
|
||||
| 35 | consumer.override.max.poll.interval.ms| Config to prevent duplication | Set to a value to avoid consumer leaving the group while the Connector is retrying <br>*Optional* |
|
||||
| 3 | kusto.url | Kusto ingest node specification | Provide the ingest URI of your ADX cluster<br>*Required* |
|
||||
| 4 | kusto.engine.url | Kusto engine node specification | Provide the engine URI of your ADX cluster<br>*Optional* |
|
||||
| 5 | aad.auth.authority | Credentials for Kusto | Provide the tenant ID of your Azure Active Directory<br>*Required* |
|
||||
| 6 | aad.auth.appid | Credentials for Kusto | Provide Azure Active Directory Service Principal Name<br>*Required* |
|
||||
| 7 | aad.auth.appkey | Credentials for Kusto | Provide Azure Active Directory Service Principal secret<br>*Required* |
|
||||
| 8 | kusto.tables.topics.mapping | Mapping of topics to tables | Provide 1..many topic-table comma-separated mappings as follows-<br>[{'topic': '\<topicName1\>','db': '\<datebaseName\>', 'table': '\<tableName\>','format': '<format-e.g.avro/csv/json>', 'mapping':'\<tableMappingName\>'}]<br>*Required* |
|
||||
| 9 | key.converter | Deserialization | One of the below supported-<br>org.apache.kafka.connect.storage.StringConverter<br> org.apache.kafka.connect.json.JsonConverter<br>io.confluent.connect.avro.AvroConverter<br>io.confluent.connect.json.JsonSchemaConverter<br> org.apache.kafka.connect.converters.ByteArrayConverter<br><br>*Required* |
|
||||
| 10 | value.converter | Deserialization | One of the below supported-<br>org.apache.kafka.connect.storage.StringConverter<br> org.apache.kafka.connect.json.JsonConverter<br>io.confluent.connect.avro.AvroConverter<br>io.confluent.connect.json.JsonSchemaConverter<br> org.apache.kafka.connect.converters.ByteArrayConverter<br><br>*Required* |
|
||||
| 11 | value.converter.schema.registry.url | Schema validation | URI of the Kafka schema registry<br>*Optional* |
|
||||
| 12 | value.converter.schemas.enable | Schema validation | Set to true if you have embedded schema with payload but are not leveraging the schema registry<br>Applicable for avro and json<br><br>*Optional* |
|
||||
| 13 | tasks.max | connector parallelism | Specify the number of connector copy/sink tasks<br>*Required* |
|
||||
| 14 | flush.size.bytes | Performance knob for batching | Maximum bufer byte size per topic+partition combination that in combination with flush.interval.ms (whichever is reached first) should result in sinking to Kusto<br>*Default - 1 MB*<br>*Required* |
|
||||
| 15 | flush.interval.ms | Performance knob for batching | Minimum time interval per topic+partition combo that in combination with flush.size.bytes (whichever is reached first) should result in sinking to Kusto<br>*Default - 300 ms*<br>*Required* |
|
||||
| 16 | tempdir.path | Local directory path on Kafka Connect worker to buffer files to before shipping to Kusto | Default is value returned by ```System.getProperty("java.io.tmpdir")``` with a GUID attached to it<br><br>*Optional* |
|
||||
| 17 | behavior.on.error | Configurable behavior in response to errors encountered | Possible values - log, ignore, fail<br><br>log - log the error, send record to dead letter queue, and continue processing<br>ignore - log the error, send record to dead letter queue, proceed with processing despite errors encountered<br>fail - shut down connector task upon encountering<br><br>*Default - fail*<br>*Optional* |
|
||||
| 18 | errors.retry.max.time.ms | Configurable retries for transient errors | Period of time in milliseconds to retry for transient errors<br><br>*Default - 300 ms*<br>*Optional* |
|
||||
| 19 | errors.retry.backoff.time.ms | Configurable retries for transient errors | Period of time in milliseconds to backoff before retry for transient errors<br><br>*Default - 10 ms*<br>*Optional* |
|
||||
| 20 | errors.deadletterqueue.bootstrap.servers | Channel to write records that failed deserialization | CSV or kafkaBroker:port <br>*Optional* |
|
||||
| 21 | errors.deadletterqueue.topic.name | Channel to write records that failed deserialization | Pre-created topic name <br>*Optional* |
|
||||
| 22 | errors.deadletterqueue.security.protocol | Channel to write records that failed deserialization | Securitry protocol of secure Kafka cluster <br>*Optional but when feature is used with secure cluster, is required* |
|
||||
| 23 | errors.deadletterqueue.sasl.mechanism | Channel to write records that failed deserialization | SASL mechanism of secure Kafka cluster<br>*Optional but when feature is used with secure cluster, is required* |
|
||||
| 24 | errors.deadletterqueue.sasl.jaas.config | Channel to write records that failed deserialization | JAAS config of secure Kafka cluster<br>*Optional but when feature is used with secure cluster, is required* |
|
||||
| 25 | misc.deadletterqueue.bootstrap.servers | Channel to write records that due to reasons other than deserialization | CSV of kafkaBroker:port <br>*Optional* |
|
||||
| 26 | misc.deadletterqueue.topic.name | Channel to write records that due to reasons other than deserialization | Pre-created topic name <br>*Optional* |
|
||||
| 27 | misc.deadletterqueue.security.protocol | Channel to write records that due to reasons other than deserialization | Securitry protocol of secure Kafka cluster <br>*Optional but when feature is used with secure cluster, is required* |
|
||||
| 28 | misc.deadletterqueue.sasl.mechanism | Channel to write records that due to reasons other than deserialization | SASL mechanism of secure Kafka cluster<br>*Optional but when feature is used with secure cluster, is required* |
|
||||
| 29 | misc.deadletterqueue.sasl.jaas.config | Channel to write records that due to reasons other than deserialization | JAAS config of secure Kafka cluster<br>*Optional but when feature is used with secure cluster, is required* |
|
||||
| 30 | consumer.override.bootstrap.servers | Security details explicitly required for secure Kafka clusters | Bootstrap server:port CSV of secure Kafka cluster <br>*Required for secure Kafka clusters* |
|
||||
| 31 | consumer.override.security.protocol | Security details explicitly required for secure Kafka clusters | Security protocol of secure Kafka cluster <br>*Required for secure Kafka clusters* |
|
||||
| 32 | consumer.override.sasl.mechanism | Security details explicitly required for secure Kafka clusters | SASL mechanism of secure Kafka cluster<br>*Required for secure Kafka clusters* |
|
||||
| 33 | consumer.override.sasl.jaas.config | Security details explicitly required for secure Kafka clusters | JAAS config of secure Kafka cluster<br>*Required for secure Kafka clusters* |
|
||||
| 34 | consumer.override.sasl.kerberos.service.name | Security details explicitly required for secure Kafka clusters, specifically kerberized Kafka | Kerberos service name of kerberized Kafka cluster<br>*Required for kerberized Kafka clusters* |
|
||||
| 35 | consumer.override.auto.offset.reset | Configurable consuming from offset | Possible values are - earliest or latest<br>*Optional* |
|
||||
| 36 | consumer.override.max.poll.interval.ms| Config to prevent duplication | Set to a value to avoid consumer leaving the group while the Connector is retrying <br>*Optional* |
|
||||
|
||||
<hr>
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ tasks.max=1
|
|||
#topics=testing1,testing2
|
||||
|
||||
#kusto.url=https://ingest-{cluster}.kusto.windows.net/
|
||||
#kusto.engine.url=https://{cluster}.kusto.windows.net/
|
||||
|
||||
#kusto.tables.topics.mapping=[{'topic': 'testing1','db': 'test_db', 'table': 'test_table_1','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'test_db', 'table': 'test_table_2','format': 'csv', 'mapping':'CsvMapping'}]
|
||||
|
||||
|
|
|
@ -38,10 +38,14 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
}
|
||||
|
||||
// TODO: this might need to be per kusto cluster...
|
||||
static final String KUSTO_URL_CONF = "kusto.url";
|
||||
static final String KUSTO_URL_CONF = "kusto.url"; // TODO [yischoen 2020-09-29]: Next major version bump, change to kusto.ingest.url (and update README+docs)
|
||||
private static final String KUSTO_URL_DOC = "Kusto ingestion service URI.";
|
||||
private static final String KUSTO_URL_DISPLAY = "Kusto cluster ingestion URI";
|
||||
|
||||
static final String KUSTO_ENGINE_URL_CONF = "kusto.engine.url";
|
||||
private static final String KUSTO_ENGINE_URL_DOC = "Kusto engine service URI.";
|
||||
private static final String KUSTO_ENGINE_URL_DISPLAY = "Kusto cluster engine URI";
|
||||
|
||||
static final String KUSTO_AUTH_APPID_CONF = "aad.auth.appid";
|
||||
private static final String KUSTO_AUTH_APPID_DOC = "Application Id for Azure Active Directory authentication.";
|
||||
private static final String KUSTO_AUTH_APPID_DISPLAY = "Kusto Auth AppID";
|
||||
|
@ -93,7 +97,7 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF = "misc.deadletterqueue.bootstrap.servers";
|
||||
private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DOC = "Configure this list to Kafka broker's address(es) "
|
||||
+ "to which the Connector should write records failed due to restrictions while writing to the file in `tempdir.path`, network interruptions or unavailability of Kusto cluster. "
|
||||
+ "This list should be in the form host-1:port-1,host-2:port-2,…host-n:port-n. ";
|
||||
+ "This list should be in the form host-1:port-1,host-2:port-2,…host-n:port-n.";
|
||||
private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DISPLAY = "Miscellaneous Dead-Letter Queue Bootstrap Servers";
|
||||
|
||||
static final String KUSTO_DLQ_TOPIC_NAME_CONF = "misc.deadletterqueue.topic.name";
|
||||
|
@ -102,7 +106,7 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
private static final String KUSTO_DLQ_TOPIC_NAME_DISPLAY = "Miscellaneous Dead-Letter Queue Topic Name";
|
||||
|
||||
static final String KUSTO_SINK_MAX_RETRY_TIME_MS_CONF = "errors.retry.max.time.ms";
|
||||
private static final String KUSTO_SINK_MAX_RETRY_TIME_MS_DOC = "Maximum time upto which the Connector "
|
||||
private static final String KUSTO_SINK_MAX_RETRY_TIME_MS_DOC = "Maximum time up to which the Connector "
|
||||
+ "should retry writing records to Kusto table in case of failures.";
|
||||
private static final String KUSTO_SINK_MAX_RETRY_TIME_MS_DISPLAY = "Errors Maximum Retry Time";
|
||||
|
||||
|
@ -120,18 +124,16 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
}
|
||||
|
||||
public static ConfigDef getConfig() {
|
||||
|
||||
ConfigDef result = new ConfigDef();
|
||||
|
||||
defineConnectionConfigs(result);
|
||||
defineWriteConfigs(result);
|
||||
defineErrorHandlingAndRetriesConfgis(result);
|
||||
defineErrorHandlingAndRetriesConfigs(result);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private static void defineErrorHandlingAndRetriesConfgis(ConfigDef result) {
|
||||
|
||||
private static void defineErrorHandlingAndRetriesConfigs(ConfigDef result) {
|
||||
final String errorAndRetriesGroupName = "Error Handling and Retries";
|
||||
int errorAndRetriesGroupOrder = 0;
|
||||
|
||||
|
@ -193,7 +195,6 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
}
|
||||
|
||||
private static void defineWriteConfigs(ConfigDef result) {
|
||||
|
||||
final String writeGroupName = "Writes";
|
||||
int writeGroupOrder = 0;
|
||||
|
||||
|
@ -243,7 +244,6 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
}
|
||||
|
||||
private static void defineConnectionConfigs(ConfigDef result) {
|
||||
|
||||
final String connectionGroupName = "Connection";
|
||||
int connectionGroupOrder = 0;
|
||||
|
||||
|
@ -258,6 +258,16 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
connectionGroupOrder++,
|
||||
Width.MEDIUM,
|
||||
KUSTO_URL_DISPLAY)
|
||||
.define(
|
||||
KUSTO_ENGINE_URL_CONF,
|
||||
Type.STRING,
|
||||
"", // TODO [yischoen 2020-09-29]: Next major version bump, set to ConfigDef.NO_DEFAULT_VALUE to make it a required setting
|
||||
Importance.LOW,
|
||||
KUSTO_ENGINE_URL_DOC,
|
||||
connectionGroupName,
|
||||
connectionGroupOrder++,
|
||||
Width.MEDIUM,
|
||||
KUSTO_ENGINE_URL_DISPLAY)
|
||||
.define(
|
||||
KUSTO_AUTH_APPKEY_CONF,
|
||||
Type.PASSWORD,
|
||||
|
@ -294,6 +304,13 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
return this.getString(KUSTO_URL_CONF);
|
||||
}
|
||||
|
||||
// TODO [yischoen 2020-09-29]: Next major version bump, just return the value, as it will be a required setting
|
||||
public String getKustoEngineUrl() {
|
||||
if (!Strings.isNullOrEmpty(this.getString(KUSTO_ENGINE_URL_CONF)))
|
||||
return this.getString(KUSTO_ENGINE_URL_CONF);
|
||||
return this.getString(KUSTO_URL_CONF).replaceFirst("(?i)ingest-", "");
|
||||
}
|
||||
|
||||
public String getAuthAppid() {
|
||||
return this.getString(KUSTO_AUTH_APPID_CONF);
|
||||
}
|
||||
|
@ -368,5 +385,4 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
public static void main(String[] args) {
|
||||
System.out.println(getConfig().toEnrichedRst());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -8,9 +8,9 @@ import com.microsoft.azure.kusto.data.KustoOperationResult;
|
|||
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
|
||||
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
|
||||
import com.microsoft.azure.kusto.ingest.IngestClient;
|
||||
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
|
||||
import com.microsoft.azure.kusto.ingest.IngestionMapping;
|
||||
import com.microsoft.azure.kusto.ingest.IngestionProperties;
|
||||
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
|
@ -27,14 +27,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Properties;
|
||||
import java.util.*;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -43,14 +36,20 @@ import java.util.Properties;
|
|||
* Currently only ingested files are "committed" in the sense that we can advance the offset according to it.
|
||||
*/
|
||||
public class KustoSinkTask extends SinkTask {
|
||||
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
|
||||
|
||||
|
||||
static final String FETCH_TABLE_QUERY = "%s|count";
|
||||
static final String FETCH_TABLE_MAPPING_QUERY = ".show table %s ingestion %s mapping '%s'";
|
||||
static final String FETCH_PRINCIPAL_ROLES_QUERY = ".show principal access with (principal = '%s', accesstype='ingest',database='%s',table='%s')";
|
||||
static final int INGESTION_ALLOWED_INDEX = 3;
|
||||
|
||||
private static final String MAPPING = "mapping";
|
||||
public static final String MAPPING_FORMAT = "format";
|
||||
public static final String MAPPING_TABLE = "table";
|
||||
public static final String MAPPING_DB = "db";
|
||||
public static final String JSON_FORMAT = "json";
|
||||
public static final String MULTIJSON_FORMAT = "multijson";
|
||||
|
||||
private final Set<TopicPartition> assignment;
|
||||
private Map<String, TopicIngestionProperties> topicsToIngestionProps;
|
||||
private KustoSinkConfig config;
|
||||
|
@ -96,7 +95,7 @@ public class KustoSinkTask extends SinkTask {
|
|||
|
||||
public static Client createKustoEngineClient(KustoSinkConfig config) {
|
||||
try {
|
||||
String engineClientURL = config.getKustoUrl().replace("https://ingest-", "https://");
|
||||
String engineClientURL = config.getKustoEngineUrl();
|
||||
if (!Strings.isNullOrEmpty(config.getAuthAppid())) {
|
||||
if (Strings.isNullOrEmpty(config.getAuthAppkey())) {
|
||||
throw new ConfigException("Kusto authentication missing App Key.");
|
||||
|
@ -121,50 +120,46 @@ public class KustoSinkTask extends SinkTask {
|
|||
}
|
||||
|
||||
public static Map<String, TopicIngestionProperties> getTopicsToIngestionProps(KustoSinkConfig config) {
|
||||
Map<String, TopicIngestionProperties> result = new HashMap<>();
|
||||
Map<String, TopicIngestionProperties> result = new HashMap<>();
|
||||
|
||||
try {
|
||||
|
||||
JSONArray mappings = new JSONArray(config.getTopicToTableMapping());
|
||||
|
||||
for (int i =0; i< mappings.length(); i++) {
|
||||
|
||||
|
||||
for (int i = 0; i < mappings.length(); i++) {
|
||||
JSONObject mapping = mappings.getJSONObject(i);
|
||||
|
||||
String db = mapping.getString("db");
|
||||
String table = mapping.getString("table");
|
||||
|
||||
String format = mapping.optString("format");
|
||||
|
||||
String db = mapping.getString(MAPPING_DB);
|
||||
String table = mapping.getString(MAPPING_TABLE);
|
||||
|
||||
String format = mapping.optString(MAPPING_FORMAT);
|
||||
|
||||
IngestionProperties props = new IngestionProperties(db, table);
|
||||
|
||||
|
||||
if (format != null && !format.isEmpty()) {
|
||||
if (format.equals("json") || format.equals("singlejson") || format.equalsIgnoreCase("multijson")) {
|
||||
props.setDataFormat("multijson");
|
||||
if (format.equals(JSON_FORMAT) || format.equals("singlejson") || format.equalsIgnoreCase(MULTIJSON_FORMAT)) {
|
||||
props.setDataFormat(MULTIJSON_FORMAT);
|
||||
}
|
||||
props.setDataFormat(format);
|
||||
}
|
||||
|
||||
|
||||
if (format != null && !format.isEmpty()) {
|
||||
if (format.equals("json") || format.equals("singlejson")){
|
||||
props.setDataFormat("multijson");
|
||||
if (format.equals("json") || format.equals("singlejson")) {
|
||||
props.setDataFormat(MULTIJSON_FORMAT);
|
||||
}
|
||||
props.setDataFormat(format);
|
||||
}
|
||||
|
||||
|
||||
String mappingRef = mapping.optString("mapping");
|
||||
|
||||
if (mappingRef != null && !mappingRef.isEmpty()) {
|
||||
if (format != null) {
|
||||
if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.json.toString())){
|
||||
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Json);
|
||||
} else if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.avro.toString())){
|
||||
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Avro);
|
||||
} else if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.apacheavro.toString())){
|
||||
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.ApacheAvro);
|
||||
} else {
|
||||
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Csv);
|
||||
}
|
||||
|
||||
if (mappingRef != null && !mappingRef.isEmpty() && format != null) {
|
||||
if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.json.toString())) {
|
||||
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Json);
|
||||
} else if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.avro.toString())) {
|
||||
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Avro);
|
||||
} else if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.apacheavro.toString())) {
|
||||
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.ApacheAvro);
|
||||
} else {
|
||||
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Csv);
|
||||
}
|
||||
}
|
||||
TopicIngestionProperties topicIngestionProperties = new TopicIngestionProperties();
|
||||
|
@ -172,8 +167,7 @@ public class KustoSinkTask extends SinkTask {
|
|||
result.put(mapping.getString("topic"), topicIngestionProperties);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
catch (Exception ex) {
|
||||
} catch (Exception ex) {
|
||||
throw new ConfigException("Error while parsing kusto ingestion properties.", ex);
|
||||
}
|
||||
}
|
||||
|
@ -189,31 +183,27 @@ public class KustoSinkTask extends SinkTask {
|
|||
Client engineClient = createKustoEngineClient(config);
|
||||
if (config.getTopicToTableMapping() != null) {
|
||||
JSONArray mappings = new JSONArray(config.getTopicToTableMapping());
|
||||
if(mappings.length() > 0) {
|
||||
if(isIngestorRole(mappings.getJSONObject(0), engineClient)) {
|
||||
for (int i = 0; i < mappings.length(); i++) {
|
||||
JSONObject mapping = mappings.getJSONObject(i);
|
||||
validateTableAccess(engineClient, mapping, config, databaseTableErrorList, accessErrorList);
|
||||
}
|
||||
if ((mappings.length() > 0) && (isIngestorRole(mappings.getJSONObject(0), engineClient))) {
|
||||
for (int i = 0; i < mappings.length(); i++) {
|
||||
JSONObject mapping = mappings.getJSONObject(i);
|
||||
validateTableAccess(engineClient, mapping, config, databaseTableErrorList, accessErrorList);
|
||||
}
|
||||
}
|
||||
}
|
||||
String tableAccessErrorMessage = "";
|
||||
|
||||
if(!databaseTableErrorList.isEmpty())
|
||||
{
|
||||
if (!databaseTableErrorList.isEmpty()) {
|
||||
tableAccessErrorMessage = "\n\nError occurred while trying to access the following database:table\n" +
|
||||
String.join("\n",databaseTableErrorList);
|
||||
String.join("\n", databaseTableErrorList);
|
||||
}
|
||||
if(!accessErrorList.isEmpty())
|
||||
{
|
||||
if (!accessErrorList.isEmpty()) {
|
||||
tableAccessErrorMessage = tableAccessErrorMessage + "\n\nUser does not have appropriate permissions " +
|
||||
"to sink data into the Kusto database:table combination(s). " +
|
||||
"Verify your Kusto principals and roles before proceeding for the following: \n " +
|
||||
String.join("\n",accessErrorList);
|
||||
String.join("\n", accessErrorList);
|
||||
}
|
||||
|
||||
if(!tableAccessErrorMessage.isEmpty()) {
|
||||
if (!tableAccessErrorMessage.isEmpty()) {
|
||||
throw new ConnectException(tableAccessErrorMessage);
|
||||
}
|
||||
} catch (JSONException e) {
|
||||
|
@ -223,11 +213,11 @@ public class KustoSinkTask extends SinkTask {
|
|||
|
||||
private boolean isIngestorRole(JSONObject testMapping, Client engineClient) throws JSONException {
|
||||
String database = testMapping.getString("db");
|
||||
String table = testMapping.getString("table");
|
||||
String table = testMapping.getString(MAPPING_TABLE);
|
||||
try {
|
||||
KustoOperationResult rs = engineClient.execute(database, String.format(FETCH_TABLE_QUERY, table));
|
||||
} catch(DataServiceException | DataClientException err){
|
||||
if(err.getCause().getMessage().contains("Forbidden:")){
|
||||
engineClient.execute(database, String.format(FETCH_TABLE_QUERY, table));
|
||||
} catch (DataServiceException | DataClientException err) {
|
||||
if (err.getCause().getMessage().contains("Forbidden:")) {
|
||||
log.warn("User might have ingestor privileges, table validation will be skipped for all table mappings ");
|
||||
return false;
|
||||
}
|
||||
|
@ -236,18 +226,19 @@ public class KustoSinkTask extends SinkTask {
|
|||
}
|
||||
|
||||
/**
|
||||
* This function validates whether the user has the read and write access to the intended table
|
||||
* before starting to sink records into ADX.
|
||||
* This function validates whether the user has the read and write access to the intended table
|
||||
* before starting to sink records into ADX.
|
||||
*
|
||||
* @param engineClient Client connection to run queries.
|
||||
* @param mapping JSON Object containing a Table mapping.
|
||||
* @param config
|
||||
* @param mapping JSON Object containing a Table mapping.
|
||||
* @param config Kusto Sink configuration
|
||||
*/
|
||||
private static void validateTableAccess(Client engineClient, JSONObject mapping, KustoSinkConfig config, List<String> databaseTableErrorList, List<String> accessErrorList) throws JSONException {
|
||||
|
||||
String database = mapping.getString("db");
|
||||
String table = mapping.getString("table");
|
||||
String format = mapping.getString("format");
|
||||
String mappingName = mapping.getString("mapping");
|
||||
|
||||
String database = mapping.getString(MAPPING_DB);
|
||||
String table = mapping.getString(MAPPING_TABLE);
|
||||
String format = mapping.getString(MAPPING_FORMAT);
|
||||
String mappingName = mapping.getString(MAPPING);
|
||||
boolean hasAccess = false;
|
||||
try {
|
||||
try {
|
||||
|
@ -259,22 +250,19 @@ public class KustoSinkTask extends SinkTask {
|
|||
} catch (DataServiceException e) {
|
||||
databaseTableErrorList.add(String.format("Database:%s Table:%s | table not found", database, table));
|
||||
}
|
||||
if(hasAccess) {
|
||||
if (hasAccess) {
|
||||
try {
|
||||
KustoOperationResult rp = engineClient.execute(database, String.format(FETCH_TABLE_MAPPING_QUERY, table, format, mappingName));
|
||||
if (rp.getPrimaryResults().getData().get(0).get(0).toString().equals(mappingName)) {
|
||||
hasAccess = true;
|
||||
}
|
||||
engineClient.execute(database, String.format(FETCH_TABLE_MAPPING_QUERY, table, format, mappingName));
|
||||
} catch (DataServiceException e) {
|
||||
hasAccess = false;
|
||||
databaseTableErrorList.add(String.format("Database:%s Table:%s | %s mapping '%s' not found", database, table, format, mappingName));
|
||||
|
||||
}
|
||||
}
|
||||
if(hasAccess) {
|
||||
if (hasAccess) {
|
||||
String authenticateWith = "aadapp=" + config.getAuthAppid();
|
||||
String query = String.format(FETCH_PRINCIPAL_ROLES_QUERY, authenticateWith, database, table);
|
||||
try {
|
||||
String authenticateWith = "aadapp=" + config.getAuthAppid();
|
||||
KustoOperationResult rs = engineClient.execute(database, String.format(FETCH_PRINCIPAL_ROLES_QUERY, authenticateWith, database, table));
|
||||
KustoOperationResult rs = engineClient.execute(database, query);
|
||||
hasAccess = (boolean) rs.getPrimaryResults().getData().get(0).get(INGESTION_ALLOWED_INDEX);
|
||||
if (hasAccess) {
|
||||
log.info("User has appropriate permissions to sink data into the Kusto table={}", table);
|
||||
|
@ -284,7 +272,7 @@ public class KustoSinkTask extends SinkTask {
|
|||
}
|
||||
} catch (DataServiceException e) {
|
||||
// Logging the error so that the trace is not lost.
|
||||
log.error("{}", e);
|
||||
log.error("Error fetching principal roles with query {}", query, e);
|
||||
databaseTableErrorList.add(String.format("Database:%s Table:%s", database, table));
|
||||
}
|
||||
}
|
||||
|
@ -299,11 +287,11 @@ public class KustoSinkTask extends SinkTask {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void open(Collection<TopicPartition> partitions) throws ConnectException {
|
||||
public void open(Collection<TopicPartition> partitions) {
|
||||
assignment.addAll(partitions);
|
||||
for (TopicPartition tp : assignment) {
|
||||
TopicIngestionProperties ingestionProps = getIngestionProps(tp.topic());
|
||||
log.debug(String.format("Open Kusto topic: '%s' with partition: '%s'", tp.topic(), tp.partition()));
|
||||
log.debug("Open Kusto topic: '{}' with partition: '{}'", tp.topic(), tp.partition());
|
||||
if (ingestionProps == null) {
|
||||
throw new ConnectException(String.format("Kusto Sink has no ingestion props mapped " +
|
||||
"for the topic: %s. please check your configuration.", tp.topic()));
|
||||
|
@ -323,17 +311,16 @@ public class KustoSinkTask extends SinkTask {
|
|||
writers.remove(tp);
|
||||
assignment.remove(tp);
|
||||
} catch (ConnectException e) {
|
||||
log.error("Error closing writer for {}. Error: {}", tp, e);
|
||||
log.error("Error closing writer for {}.", tp, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(Map<String, String> props) {
|
||||
|
||||
config = new KustoSinkConfig(props);
|
||||
String url = config.getKustoUrl();
|
||||
|
||||
|
||||
validateTableMappings(config);
|
||||
if (config.isDlqEnabled()) {
|
||||
isDlqEnabled = true;
|
||||
|
@ -351,16 +338,15 @@ public class KustoSinkTask extends SinkTask {
|
|||
isDlqEnabled = false;
|
||||
dlqTopicName = null;
|
||||
}
|
||||
|
||||
|
||||
topicsToIngestionProps = getTopicsToIngestionProps(config);
|
||||
|
||||
|
||||
// this should be read properly from settings
|
||||
kustoIngestClient = createKustoIngestClient(config);
|
||||
|
||||
log.info(String.format("Started KustoSinkTask with target cluster: (%s), source topics: (%s)",
|
||||
url, topicsToIngestionProps.keySet().toString()));
|
||||
|
||||
log.info("Started KustoSinkTask with target cluster: ({}), source topics: ({})", url, topicsToIngestionProps.keySet());
|
||||
// Adding this check to make code testable
|
||||
if(context!=null) {
|
||||
if (context != null) {
|
||||
open(context.assignment());
|
||||
}
|
||||
}
|
||||
|
@ -372,7 +358,7 @@ public class KustoSinkTask extends SinkTask {
|
|||
writer.close();
|
||||
}
|
||||
try {
|
||||
if(kustoIngestClient != null) {
|
||||
if (kustoIngestClient != null) {
|
||||
kustoIngestClient.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
@ -381,10 +367,10 @@ public class KustoSinkTask extends SinkTask {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void put(Collection<SinkRecord> records) throws ConnectException {
|
||||
public void put(Collection<SinkRecord> records) {
|
||||
SinkRecord lastRecord = null;
|
||||
for (SinkRecord record : records) {
|
||||
log.debug("record to topic:" + record.topic());
|
||||
log.debug("Record to topic: {}", record.topic());
|
||||
|
||||
lastRecord = record;
|
||||
TopicPartition tp = new TopicPartition(record.topic(), record.kafkaPartition());
|
||||
|
@ -401,7 +387,7 @@ public class KustoSinkTask extends SinkTask {
|
|||
}
|
||||
|
||||
if (lastRecord != null) {
|
||||
log.debug("Last record offset:" + lastRecord.kafkaOffset());
|
||||
log.debug("Last record offset: {}", lastRecord.kafkaOffset());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -413,7 +399,7 @@ public class KustoSinkTask extends SinkTask {
|
|||
) {
|
||||
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
|
||||
for (TopicPartition tp : assignment) {
|
||||
if(writers.get(tp) == null) {
|
||||
if (writers.get(tp) == null) {
|
||||
throw new ConnectException("Topic Partition not configured properly. " +
|
||||
"verify your `topics` and `kusto.tables.topics.mapping` configurations");
|
||||
}
|
||||
|
@ -421,7 +407,7 @@ public class KustoSinkTask extends SinkTask {
|
|||
Long lastCommittedOffset = writers.get(tp).lastCommittedOffset;
|
||||
|
||||
if (lastCommittedOffset != null) {
|
||||
Long offset = lastCommittedOffset + 1L;
|
||||
long offset = lastCommittedOffset + 1L;
|
||||
log.debug("Forwarding to framework request to commit offset: {} for {} while the offset is {}", offset, tp, offsets.get(tp));
|
||||
offsetsToCommit.put(tp, new OffsetAndMetadata(offset));
|
||||
}
|
||||
|
@ -433,6 +419,5 @@ public class KustoSinkTask extends SinkTask {
|
|||
@Override
|
||||
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
|
||||
// do nothing , rolling files can handle writing
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -8,32 +8,17 @@ import org.apache.commons.io.IOUtils;
|
|||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.sink.SinkRecord;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.Assert;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.*;
|
||||
import java.nio.file.Paths;
|
||||
import java.time.Instant;
|
||||
import java.util.HashMap;
|
||||
import java.util.Objects;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.AbstractMap;
|
||||
|
||||
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
|
||||
|
||||
public class FileWriterTest {
|
||||
private File currentDirectory;
|
||||
|
@ -68,7 +53,7 @@ public class FileWriterTest {
|
|||
boolean mkdirs = folder.mkdirs();
|
||||
Assert.assertTrue(mkdirs);
|
||||
|
||||
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
|
||||
Assert.assertEquals(0, Objects.requireNonNull(folder.listFiles()).length);
|
||||
|
||||
final String FILE_PATH = Paths.get(path, "ABC").toString();
|
||||
final int MAX_FILE_SIZE = 128;
|
||||
|
@ -82,8 +67,8 @@ public class FileWriterTest {
|
|||
SinkRecord record = new SinkRecord("topic", 1, null, null, Schema.BYTES_SCHEMA, msg.getBytes(), 10);
|
||||
fileWriter.initializeRecordWriter(record);
|
||||
fileWriter.openFile(null);
|
||||
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 1);
|
||||
Assert.assertEquals(fileWriter.currentFile.rawBytes, 0);
|
||||
Assert.assertEquals(1, Objects.requireNonNull(folder.listFiles()).length);
|
||||
Assert.assertEquals(0, fileWriter.currentFile.rawBytes);
|
||||
Assert.assertEquals(fileWriter.currentFile.path, FILE_PATH);
|
||||
Assert.assertTrue(fileWriter.currentFile.file.canWrite());
|
||||
|
||||
|
@ -98,7 +83,7 @@ public class FileWriterTest {
|
|||
boolean mkdirs = folder.mkdirs();
|
||||
Assert.assertTrue(mkdirs);
|
||||
|
||||
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
|
||||
Assert.assertEquals(0, Objects.requireNonNull(folder.listFiles()).length);
|
||||
|
||||
HashMap<String, Long> files = new HashMap<>();
|
||||
|
||||
|
@ -116,21 +101,21 @@ public class FileWriterTest {
|
|||
fileWriter.writeData(record1);
|
||||
}
|
||||
|
||||
Assert.assertEquals(files.size(), 4);
|
||||
Assert.assertEquals(4, files.size());
|
||||
|
||||
// should still have 1 open file at this point...
|
||||
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 1);
|
||||
Assert.assertEquals(1, Objects.requireNonNull(folder.listFiles()).length);
|
||||
|
||||
// close current file
|
||||
fileWriter.close();
|
||||
Assert.assertEquals(files.size(), 5);
|
||||
Assert.assertEquals(5, files.size());
|
||||
|
||||
List<Long> sortedFiles = new ArrayList<>(files.values());
|
||||
sortedFiles.sort((Long x, Long y) -> (int) (y - x));
|
||||
Assert.assertEquals(sortedFiles, Arrays.asList((long) 108, (long) 108, (long) 108, (long) 108, (long) 54));
|
||||
|
||||
// make sure folder is clear once done
|
||||
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
|
||||
Assert.assertEquals(0, Objects.requireNonNull(folder.listFiles()).length);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -157,9 +142,9 @@ public class FileWriterTest {
|
|||
|
||||
Thread.sleep(1000);
|
||||
|
||||
Assert.assertEquals(files.size(), 0);
|
||||
Assert.assertEquals(0, files.size());
|
||||
fileWriter.close();
|
||||
Assert.assertEquals(files.size(), 1);
|
||||
Assert.assertEquals(1, files.size());
|
||||
|
||||
String path2 = Paths.get(currentDirectory.getPath(), "testGzipFileWriter2_2").toString();
|
||||
File folder2 = new File(path2);
|
||||
|
@ -175,7 +160,7 @@ public class FileWriterTest {
|
|||
fileWriter2.writeData(record1);
|
||||
Thread.sleep(1050);
|
||||
|
||||
Assert.assertEquals(files.size(), 2);
|
||||
Assert.assertEquals(2, files.size());
|
||||
|
||||
List<Long> sortedFiles = new ArrayList<>(files.values());
|
||||
sortedFiles.sort((Long x, Long y) -> (int) (y - x));
|
||||
|
@ -183,7 +168,7 @@ public class FileWriterTest {
|
|||
|
||||
// make sure folder is clear once done
|
||||
fileWriter2.close();
|
||||
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
|
||||
Assert.assertEquals(0, Objects.requireNonNull(folder.listFiles()).length);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -212,7 +197,7 @@ public class FileWriterTest {
|
|||
boolean mkdirs = folder.mkdirs();
|
||||
Assert.assertTrue(mkdirs);
|
||||
Function<Long, String> generateFileName = (Long offset) -> {
|
||||
if(offset == null){
|
||||
if (offset == null) {
|
||||
offset = offsets.currentOffset;
|
||||
}
|
||||
return Paths.get(path, Long.toString(offset)).toString();
|
||||
|
@ -247,16 +232,19 @@ public class FileWriterTest {
|
|||
Thread.sleep(510);
|
||||
|
||||
// Assertions
|
||||
Assert.assertEquals(files.size(), 2);
|
||||
Assert.assertEquals(2, files.size());
|
||||
|
||||
// Make sure that the first file is from offset 1 till 2 and second is from 3 till 3
|
||||
Assert.assertEquals(files.stream().map(Map.Entry::getValue).toArray(Long[]::new), new Long[]{30L, 15L});
|
||||
Assert.assertEquals(files.stream().map((s)->s.getKey().substring(path.length() + 1)).toArray(String[]::new), new String[]{"1", "3"});
|
||||
Assert.assertEquals(committedOffsets, new ArrayList<Long>(){{add(2L);add(3L);}});
|
||||
Assert.assertEquals(new Long[]{30L, 15L}, files.stream().map(Map.Entry::getValue).toArray(Long[]::new));
|
||||
Assert.assertEquals(new String[]{"1", "3"}, files.stream().map((s) -> s.getKey().substring(path.length() + 1)).toArray(String[]::new));
|
||||
Assert.assertEquals(committedOffsets, new ArrayList<Long>() {{
|
||||
add(2L);
|
||||
add(3L);
|
||||
}});
|
||||
|
||||
// make sure folder is clear once done
|
||||
fileWriter2.close();
|
||||
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
|
||||
Assert.assertEquals(0, Objects.requireNonNull(folder.listFiles()).length);
|
||||
}
|
||||
|
||||
static Function<SourceFile, String> getAssertFileConsumerFunction(String msg) {
|
||||
|
@ -289,14 +277,15 @@ public class FileWriterTest {
|
|||
}
|
||||
|
||||
protected Map<String, String> getProperties() {
|
||||
Map<String, String> settings = new HashMap<>();
|
||||
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "xxx");
|
||||
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
return settings;
|
||||
Map<String, String> settings = new HashMap<>();
|
||||
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "xxx");
|
||||
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
return settings;
|
||||
}
|
||||
|
||||
static Consumer<SourceFile> getAssertFileConsumer(String msg) {
|
||||
return (SourceFile f) -> {
|
||||
try (FileInputStream fileInputStream = new FileInputStream(f.file)) {
|
||||
|
@ -324,4 +313,4 @@ public class FileWriterTest {
|
|||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,57 +1,35 @@
|
|||
package com.microsoft.azure.kusto.kafka.connect.sink;
|
||||
|
||||
import org.apache.kafka.common.config.ConfigException;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.BehaviorOnError;
|
||||
import org.apache.kafka.common.config.ConfigException;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
|
||||
public class KustoSinkConnectorConfigTest {
|
||||
Map<String, String> settings;
|
||||
KustoSinkConfig config;
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
settings = new HashMap<>();
|
||||
config = null;
|
||||
}
|
||||
private static final String ENGINE_URI = "https://cluster_name.kusto.windows.net";
|
||||
private static final String DM_URI = "https://ingest-cluster_name.kusto.windows.net";
|
||||
|
||||
@Test
|
||||
public void shouldAcceptValidConfig() {
|
||||
// Adding required Configuration with no default value.
|
||||
settings.put("kusto.tables.topics.mapping","[{'topic': 'xxx','db': 'xxx', 'table': 'xxx','format': 'avro', 'mapping':'avri'}]");
|
||||
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
|
||||
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
config = new KustoSinkConfig(settings);
|
||||
KustoSinkConfig config = new KustoSinkConfig(setupConfigs());
|
||||
assertNotNull(config);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldHaveDefaultValues() {
|
||||
// Adding required Configuration with no default value.
|
||||
settings.put("kusto.tables.topics.mapping","[{'topic': 'xxx','db': 'xxx', 'table': 'xxx','format': 'avro', 'mapping':'avri'}]");
|
||||
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
|
||||
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
config = new KustoSinkConfig(settings);
|
||||
KustoSinkConfig config = new KustoSinkConfig(setupConfigs());
|
||||
assertNotNull(config.getKustoUrl());
|
||||
assertNotNull(config.getFlushSizeBytes());
|
||||
assertNotNull(config.getFlushInterval());
|
||||
assertNotEquals(0, config.getFlushSizeBytes());
|
||||
assertNotEquals(0, config.getFlushInterval());
|
||||
assertFalse(config.isDlqEnabled());
|
||||
assertEquals(BehaviorOnError.FAIL, config.getBehaviorOnError());
|
||||
}
|
||||
|
@ -59,45 +37,70 @@ public class KustoSinkConnectorConfigTest {
|
|||
@Test(expected = ConfigException.class)
|
||||
public void shouldThrowExceptionWhenKustoURLNotGiven() {
|
||||
// Adding required Configuration with no default value.
|
||||
HashMap<String, String> settings = setupConfigs();
|
||||
settings.remove(KustoSinkConfig.KUSTO_URL_CONF);
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
config = new KustoSinkConfig(settings);
|
||||
new KustoSinkConfig(settings);
|
||||
}
|
||||
|
||||
|
||||
// TODO [yischoen 2020-09-29]: Next major version bump we will make EngineUrl required, and the following tests will be irrelevant
|
||||
@Test
|
||||
public void shouldGuessKustoEngineUrlWhenNotGiven() {
|
||||
KustoSinkConfig config = new KustoSinkConfig(setupConfigs());
|
||||
String kustoEngineUrl = config.getKustoEngineUrl();
|
||||
assertEquals(ENGINE_URI, kustoEngineUrl);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGuessKustoEngineUrlWhenNotGivenPrivateCase() {
|
||||
HashMap<String, String> settings = setupConfigs();
|
||||
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "https://private-ingest-cluster_name.kusto.windows.net");
|
||||
KustoSinkConfig config = new KustoSinkConfig(settings);
|
||||
String kustoEngineUrl = config.getKustoEngineUrl();
|
||||
assertEquals("https://private-cluster_name.kusto.windows.net", kustoEngineUrl);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldUseDmUrlWhenKustoEngineUrlWhenNotGivenAndCantGuess() {
|
||||
HashMap<String, String> settings = setupConfigs();
|
||||
settings.put(KustoSinkConfig.KUSTO_URL_CONF, ENGINE_URI);
|
||||
KustoSinkConfig config = new KustoSinkConfig(settings);
|
||||
String kustoEngineUrl = config.getKustoEngineUrl();
|
||||
assertEquals(ENGINE_URI, kustoEngineUrl);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldUseKustoEngineUrlWhenGiven() {
|
||||
HashMap<String, String> settings = setupConfigs();
|
||||
settings.put(KustoSinkConfig.KUSTO_ENGINE_URL_CONF, ENGINE_URI);
|
||||
KustoSinkConfig config = new KustoSinkConfig(settings);
|
||||
String kustoEngineUrl = config.getKustoEngineUrl();
|
||||
assertEquals(ENGINE_URI, kustoEngineUrl);
|
||||
}
|
||||
|
||||
@Test(expected = ConfigException.class)
|
||||
public void shouldThrowExceptionWhenAppIdNotGiven() {
|
||||
// Adding required Configuration with no default value.
|
||||
settings.remove(KustoSinkConfig.KUSTO_URL_CONF);
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
config = new KustoSinkConfig(settings);
|
||||
HashMap<String, String> settings = setupConfigs();
|
||||
settings.remove(KustoSinkConfig.KUSTO_AUTH_APPID_CONF);
|
||||
new KustoSinkConfig(settings);
|
||||
}
|
||||
|
||||
|
||||
@Test(expected = ConfigException.class)
|
||||
public void shouldFailWhenBehaviorOnErrorIsIllConfigured() {
|
||||
// Adding required Configuration with no default value.
|
||||
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
|
||||
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
HashMap<String, String> settings = setupConfigs();
|
||||
settings.remove(KustoSinkConfig.KUSTO_URL_CONF);
|
||||
settings.put(KustoSinkConfig.KUSTO_BEHAVIOR_ON_ERROR_CONF, "DummyValue");
|
||||
config = new KustoSinkConfig(settings);
|
||||
new KustoSinkConfig(settings);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void verifyDlqSettings() {
|
||||
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
|
||||
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
HashMap<String, String> settings = setupConfigs();
|
||||
settings.put(KustoSinkConfig.KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF, "localhost:8081,localhost:8082");
|
||||
settings.put(KustoSinkConfig.KUSTO_DLQ_TOPIC_NAME_CONF, "dlq-error-topic");
|
||||
config = new KustoSinkConfig(settings);
|
||||
|
||||
KustoSinkConfig config = new KustoSinkConfig(settings);
|
||||
|
||||
assertTrue(config.isDlqEnabled());
|
||||
assertEquals(Arrays.asList("localhost:8081", "localhost:8082"), config.getDlqBootstrapServers());
|
||||
assertEquals("dlq-error-topic", config.getDlqTopicName());
|
||||
|
@ -106,16 +109,11 @@ public class KustoSinkConnectorConfigTest {
|
|||
@Test
|
||||
public void shouldProcessDlqConfigsWithPrefix() {
|
||||
// Adding required Configuration with no default value.
|
||||
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
|
||||
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
|
||||
HashMap<String, String> settings = setupConfigs();
|
||||
settings.put("misc.deadletterqueue.security.protocol", "SASL_PLAINTEXT");
|
||||
settings.put("misc.deadletterqueue.sasl.mechanism", "PLAIN");
|
||||
|
||||
config = new KustoSinkConfig(settings);
|
||||
KustoSinkConfig config = new KustoSinkConfig(settings);
|
||||
|
||||
assertNotNull(config);
|
||||
|
||||
|
@ -125,4 +123,13 @@ public class KustoSinkConnectorConfigTest {
|
|||
assertEquals("PLAIN", dlqProps.get("sasl.mechanism"));
|
||||
}
|
||||
|
||||
}
|
||||
public static HashMap<String, String> setupConfigs() {
|
||||
HashMap<String, String> configs = new HashMap<>();
|
||||
configs.put(KustoSinkConfig.KUSTO_URL_CONF, DM_URI);
|
||||
configs.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db2', 'table': 'table2','format': 'json','mapping': 'Mapping'}]");
|
||||
configs.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
configs.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
configs.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
return configs;
|
||||
}
|
||||
}
|
|
@ -17,11 +17,11 @@ import java.util.ArrayList;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
|
||||
public class KustoSinkTaskTest {
|
||||
File currentDirectory;
|
||||
|
||||
|
@ -40,19 +40,12 @@ public class KustoSinkTaskTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSinkTaskOpen() throws Exception {
|
||||
HashMap<String, String> props = new HashMap<>();
|
||||
props.put(KustoSinkConfig.KUSTO_URL_CONF, "https://cluster_name.kusto.windows.net");
|
||||
|
||||
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db1', 'table': 'table1','format': 'json','mapping': 'Mapping'}]");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
|
||||
public void testSinkTaskOpen() {
|
||||
HashMap<String, String> configs = KustoSinkConnectorConfigTest.setupConfigs();
|
||||
KustoSinkTask kustoSinkTask = new KustoSinkTask();
|
||||
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
|
||||
doNothing().when(kustoSinkTaskSpy).validateTableMappings(Mockito.<KustoSinkConfig>any());
|
||||
kustoSinkTaskSpy.start(props);
|
||||
kustoSinkTaskSpy.start(configs);
|
||||
ArrayList<TopicPartition> tps = new ArrayList<>();
|
||||
tps.add(new TopicPartition("topic1", 1));
|
||||
tps.add(new TopicPartition("topic1", 2));
|
||||
|
@ -60,23 +53,16 @@ public class KustoSinkTaskTest {
|
|||
|
||||
kustoSinkTaskSpy.open(tps);
|
||||
|
||||
assertEquals(kustoSinkTaskSpy.writers.size(), 3);
|
||||
assertEquals(3, kustoSinkTaskSpy.writers.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSinkTaskPutRecord() throws Exception {
|
||||
HashMap<String, String> props = new HashMap<>();
|
||||
props.put(KustoSinkConfig.KUSTO_URL_CONF, "https://cluster_name.kusto.windows.net");
|
||||
props.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, System.getProperty("java.io.tmpdir"));
|
||||
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'testing1','db': 'db1', 'table': 'table1','format': 'json','mapping': 'Mapping'}]");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
|
||||
public void testSinkTaskPutRecord() {
|
||||
HashMap<String, String> configs = KustoSinkConnectorConfigTest.setupConfigs();
|
||||
KustoSinkTask kustoSinkTask = new KustoSinkTask();
|
||||
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
|
||||
doNothing().when(kustoSinkTaskSpy).validateTableMappings(Mockito.<KustoSinkConfig>any());
|
||||
kustoSinkTaskSpy.start(props);
|
||||
kustoSinkTaskSpy.start(configs);
|
||||
|
||||
ArrayList<TopicPartition> tps = new ArrayList<>();
|
||||
TopicPartition tp = new TopicPartition("topic1", 1);
|
||||
|
@ -84,68 +70,55 @@ public class KustoSinkTaskTest {
|
|||
|
||||
kustoSinkTaskSpy.open(tps);
|
||||
|
||||
List<SinkRecord> records = new ArrayList<SinkRecord>();
|
||||
List<SinkRecord> records = new ArrayList<>();
|
||||
|
||||
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "stringy message".getBytes(StandardCharsets.UTF_8), 10));
|
||||
|
||||
kustoSinkTaskSpy.put(records);
|
||||
|
||||
assertEquals(kustoSinkTaskSpy.writers.get(tp).currentOffset, 10);
|
||||
assertEquals(10, kustoSinkTaskSpy.writers.get(tp).currentOffset);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSinkTaskPutRecordMissingPartition() throws Exception {
|
||||
HashMap<String, String> props = new HashMap<>();
|
||||
props.put(KustoSinkConfig.KUSTO_URL_CONF, "https://cluster_name.kusto.windows.net");
|
||||
props.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, System.getProperty("java.io.tmpdir"));
|
||||
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db1', 'table': 'table1','format': 'json','mapping': 'Mapping'}]");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
|
||||
public void testSinkTaskPutRecordMissingPartition() {
|
||||
HashMap<String, String> configs = KustoSinkConnectorConfigTest.setupConfigs();
|
||||
configs.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, System.getProperty("java.io.tmpdir"));
|
||||
KustoSinkTask kustoSinkTask = new KustoSinkTask();
|
||||
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
|
||||
doNothing().when(kustoSinkTaskSpy).validateTableMappings(Mockito.<KustoSinkConfig>any());
|
||||
kustoSinkTaskSpy.start(props);
|
||||
kustoSinkTaskSpy.start(configs);
|
||||
|
||||
ArrayList<TopicPartition> tps = new ArrayList<>();
|
||||
tps.add(new TopicPartition("topic1", 1));
|
||||
|
||||
kustoSinkTaskSpy.open(tps);
|
||||
|
||||
List<SinkRecord> records = new ArrayList<SinkRecord>();
|
||||
List<SinkRecord> records = new ArrayList<>();
|
||||
|
||||
records.add(new SinkRecord("topic2", 1, null, null, null, "stringy message".getBytes(StandardCharsets.UTF_8), 10));
|
||||
|
||||
Throwable exception = assertThrows(ConnectException.class, () -> kustoSinkTaskSpy.put(records));
|
||||
|
||||
assertEquals(exception.getMessage(), "Received a record without a mapped writer for topic:partition(topic2:1), dropping record.");
|
||||
|
||||
assertEquals("Received a record without a mapped writer for topic:partition(topic2:1), dropping record.", exception.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getTable() {
|
||||
HashMap<String, String> props = new HashMap<>();
|
||||
props.put(KustoSinkConfig.KUSTO_URL_CONF, "https://cluster_name.kusto.windows.net");
|
||||
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db2', 'table': 'table2','format': 'json','mapping': 'Mapping'}]");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
|
||||
HashMap<String, String> configs = KustoSinkConnectorConfigTest.setupConfigs();
|
||||
KustoSinkTask kustoSinkTask = new KustoSinkTask();
|
||||
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
|
||||
doNothing().when(kustoSinkTaskSpy).validateTableMappings(Mockito.<KustoSinkConfig>any());
|
||||
kustoSinkTaskSpy.start(props);
|
||||
kustoSinkTaskSpy.start(configs);
|
||||
{
|
||||
// single table mapping should cause all topics to be mapped to a single table
|
||||
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic1").ingestionProperties.getDatabaseName(), "db1");
|
||||
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic1").ingestionProperties.getTableName(), "table1");
|
||||
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic1").ingestionProperties.getDataFormat(), "csv");
|
||||
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getDatabaseName(), "db2");
|
||||
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getTableName(), "table2");
|
||||
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getDataFormat(), "json");
|
||||
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getIngestionMapping().getIngestionMappingReference(), "Mapping");
|
||||
Assert.assertEquals("db1", kustoSinkTaskSpy.getIngestionProps("topic1").ingestionProperties.getDatabaseName());
|
||||
Assert.assertEquals("table1", kustoSinkTaskSpy.getIngestionProps("topic1").ingestionProperties.getTableName());
|
||||
Assert.assertEquals("csv", kustoSinkTaskSpy.getIngestionProps("topic1").ingestionProperties.getDataFormat());
|
||||
Assert.assertEquals("db2", kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getDatabaseName());
|
||||
Assert.assertEquals("table2", kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getTableName());
|
||||
Assert.assertEquals("json", kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getDataFormat());
|
||||
Assert.assertEquals("Mapping", kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getIngestionMapping().getIngestionMappingReference());
|
||||
Assert.assertNull(kustoSinkTaskSpy.getIngestionProps("topic3"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Загрузка…
Ссылка в новой задаче