Родитель
11e89c6cd0
Коммит
fa80b4ccda
|
@ -105,11 +105,9 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
|
||||
public static ConfigDef getConfig() {
|
||||
ConfigDef result = new ConfigDef();
|
||||
|
||||
defineConnectionConfigs(result);
|
||||
defineWriteConfigs(result);
|
||||
defineErrorHandlingAndRetriesConfigs(result);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -294,7 +292,8 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
Type.STRING,
|
||||
KustoAuthenticationStrategy.APPLICATION.name(),
|
||||
ConfigDef.ValidString.in(
|
||||
KustoAuthenticationStrategy.APPLICATION.name(), KustoAuthenticationStrategy.MANAGED_IDENTITY.name(),
|
||||
KustoAuthenticationStrategy.APPLICATION.name(),
|
||||
KustoAuthenticationStrategy.MANAGED_IDENTITY.name(),
|
||||
KustoAuthenticationStrategy.APPLICATION.name().toLowerCase(),
|
||||
KustoAuthenticationStrategy.MANAGED_IDENTITY.name().toLowerCase()),
|
||||
Importance.HIGH,
|
||||
|
@ -305,10 +304,6 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
KUSTO_AUTH_STRATEGY_DISPLAY);
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
System.out.println(getConfig().toEnrichedRst());
|
||||
}
|
||||
|
||||
public String getKustoIngestUrl() {
|
||||
return this.getString(KUSTO_INGEST_URL_CONF);
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.microsoft.azure.kusto.kafka.connect.sink;
|
||||
|
||||
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
|
||||
import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.BehaviorOnError;
|
||||
import org.apache.kafka.common.config.ConfigException;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
|
@ -54,15 +55,30 @@ public class KustoSinkConnectorConfigTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowExceptionWhenAppIdNotGiven() {
|
||||
public void shouldThrowExceptionWhenAppIdNotGivenForApplicationAuth() {
|
||||
// Adding required Configuration with no default value.
|
||||
HashMap<String, String> settings = setupConfigs();
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_STRATEGY_CONF, KustoSinkConfig.KustoAuthenticationStrategy.APPLICATION.name());
|
||||
settings.remove(KustoSinkConfig.KUSTO_AUTH_APPID_CONF);
|
||||
KustoSinkConfig config = new KustoSinkConfig(settings);
|
||||
// In the previous PR this behavior was changed. The default was to use APPLICATION auth, but it also permits
|
||||
// MI. In case of MI the App-ID/App-KEY became optional
|
||||
Assertions.assertThrows(ConfigException.class, () -> {
|
||||
new KustoSinkConfig(settings);
|
||||
KustoSinkTask.createKustoEngineConnectionString(config,config.getKustoEngineUrl());
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotThrowExceptionWhenAppIdNotGivenForManagedIdentity() {
|
||||
// Same test as above. In this case since the Auth method is MI AppId/Key becomes optional.
|
||||
HashMap<String, String> settings = setupConfigs();
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_STRATEGY_CONF, KustoSinkConfig.KustoAuthenticationStrategy.MANAGED_IDENTITY.name());
|
||||
settings.remove(KustoSinkConfig.KUSTO_AUTH_APPID_CONF);
|
||||
KustoSinkConfig config = new KustoSinkConfig(settings);
|
||||
ConnectionStringBuilder kcsb = KustoSinkTask.createKustoEngineConnectionString(config,config.getKustoEngineUrl());
|
||||
Assertions.assertNotNull(kcsb);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldFailWhenBehaviorOnErrorIsIllConfigured() {
|
||||
// Adding required Configuration with no default value.
|
||||
|
@ -80,7 +96,6 @@ public class KustoSinkConnectorConfigTest {
|
|||
settings.put(KustoSinkConfig.KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF, "localhost:8081,localhost:8082");
|
||||
settings.put(KustoSinkConfig.KUSTO_DLQ_TOPIC_NAME_CONF, "dlq-error-topic");
|
||||
KustoSinkConfig config = new KustoSinkConfig(settings);
|
||||
|
||||
Assertions.assertTrue(config.isDlqEnabled());
|
||||
Assertions.assertEquals(Arrays.asList("localhost:8081", "localhost:8082"), config.getDlqBootstrapServers());
|
||||
Assertions.assertEquals("dlq-error-topic", config.getDlqTopicName());
|
||||
|
@ -92,13 +107,9 @@ public class KustoSinkConnectorConfigTest {
|
|||
HashMap<String, String> settings = setupConfigs();
|
||||
settings.put("misc.deadletterqueue.security.protocol", "SASL_PLAINTEXT");
|
||||
settings.put("misc.deadletterqueue.sasl.mechanism", "PLAIN");
|
||||
|
||||
KustoSinkConfig config = new KustoSinkConfig(settings);
|
||||
|
||||
Assertions.assertNotNull(config);
|
||||
|
||||
Properties dlqProps = config.getDlqProps();
|
||||
|
||||
Assertions.assertEquals("SASL_PLAINTEXT", dlqProps.get("security.protocol"));
|
||||
Assertions.assertEquals("PLAIN", dlqProps.get("sasl.mechanism"));
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче